CompletableFuture 클래스
CompletableFuture 클래스는 java8에 도입된 Future, CompletionStage 인터페이스의 구현체다.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
}
Future
- 비동기적인 작업을 수행
- 해당 작업이 완료되면 결과를 반환하는 인터페이스
- java5부터 java.util.concurrency package에서 비동기의 결과값을 받는 용도로 사용
CompletionStage
- 비동기적인 작업을 수행
- 해당 작업이 완료되면 결과를 처리하거나 다른 CompletionStage를 연결 하는 인터페이스
- 쉽게 말하자면, 하나의 비동기 작업을 수행하고 완료가 되었을 때, 여기에 또다른 작업을 수행할 수 있도록 메서드를 제공하는 인터페이스
- java8에서 CompletableFuturue 클래스와 함께 제공
Future 인터페이스
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService
- 쓰레드 풀을 이용하여 비동기적으로 작업을 실행하고 관리한다.
- 별도의 쓰레드를 생성하고 관리하지 않아도 되므로, 코드를 간결하게 유지 가능하다.
- 쓰레드 풀을 이용하여 자원을 효율적으로 관리한다.
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1) execute
Runnable 인터페이스를 구현한 작업을 쓰레드 풀에서 비동기적으로 실행한다.
2) submit
Callable 인터페이스를 구현한 작업 을 쓰레드 풀에서 비동기적으로 실행하고, 해당 작업의 결과를 Future 객체로 반환한다.
3) shutdown
ExecutorService를 종료. 더 이상 task를 받지 않는다.
Executors.java - ExecutorService 생성
▶ 각 메서드별 예제코드 등 자세한 정보는 이전 포스팅을 참고하자!
https://devfunny.tistory.com/807
[JAVA8 병렬프로그래밍] Executors 클래스, ExecutorService 인터페이스
Executors 클래스 - Executor 인터페이스 : 컨커런트 API의 핵심 인터페이스다. 이 인터페이스를 구현한 여러 종류의 클래스를 기본으로 제공한다. - 스레드 풀 : 스레드를 관리하기 위한 풀이다. 병렬
devfunny.tistory.com
1) newSingleThreadExecutor
- 단일 쓰레드로 구성된 스레드 풀을 생성한다.
- 한 번에 하나의 작업만 실행한다.
2) newFixedThreadPool
- 고정된 크기의 쓰레드 풀을 생성한다.
- 크기는 인자로 주어진 n과 동일하다.
3) newCachedThreadPool
- 사용 가능한 쓰레드가 없다면 새로 생성해서 작업을 처리하고, 있다면 재사용한다.
- 쓰레드가 일정 시간 사용되지 않으면 회수한다.
4) newScheduledThreadPool
- 스케줄링 기능을 갖춘 고정 크기의 쓰레드 풀을 생성한다.
- 주기적이거나 지연이 발생하는 작업을 실행한다.
5) newWorkStealingPool
- work steal 알고리즘을 사용하는 ForkJoinPool을 생성한다.
Future의 isDone(), isCancelled()
isDone()
- task가 완료되었다면, 원인과 상관없이 true 반환
isCancelled()
- task가 명시적으로 취소된 경우, true 반환
Future의 get()
public class p085_FutureGetExample {
public static void main(String[] args)
throws InterruptedException, ExecutionException {
Future future = p083_FutureHelper.getFuture();
assert !future.isDone();
assert !future.isCancelled();
var result = future.get();
assert result.equals(1);
assert future.isDone();
assert !future.isCancelled();
}
}
- 결과를 구할 때까지 thread가 계속 block된다.
- future에서 무한 루프나 오랜 시간이 걸린다면 thread가 blocking 상태를 유지한다.
Future의 get(long timeout, TimeUnit unit)
public class p086_FutureGetTimeoutExample {
public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
Future future = p083_FutureHelper.getFutureCompleteAfter1s();
// get() ; 비동기 처리가 어려움
var result = future.get(1500, TimeUnit.MILLISECONDS);
assert result.equals(1);
Future futureToTimeout = p083_FutureHelper.getFutureCompleteAfter1s();
Exception exception = null;
try {
futureToTimeout.get(500, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
exception = e;
}
assert exception != null;
}
}
- 결과를 구할 때까지 timeout동안 thread가 block된다.
- timeout이 넘어가도 응답이 반환되지 않으면 TimeoutException 발생한다.
Future의 cancel(boolean mayInterruptIfRunning)
public class p087_FutureCancelExample {
public static void main(String[] args) {
Future future = p083_FutureHelper.getFuture();
// 취소할 수 없는 상황이라면 false 리턴
// mayInterruptIfRunning이 false 라면 시작하지 않은 작업에 대해서만 취소
var successToCancel = future.cancel(true);
assert future.isCancelled();
assert future.isDone();
assert successToCancel;
successToCancel = future.cancel(true);
assert future.isCancelled();
assert future.isDone();
assert !successToCancel;
}
}
- future의 작업 실행을 취소한다.
- 취소할 수 없는 상황이라면 false를 반환한다.
- mayInterruptIfRunning가 false라면 시작하지 않은 작업에 대해서만 취소한다.
Future 인터페이스의 한계
- cancel을 제외하고 외부에서 future를 컨트롤 할 수 없다.
- 반환된 결과를 get() 해서 접근하기 때문에 비동기 처리가 어렵다.
- 완료되거나 에러가 발생했는지 구분하기 어렵다.
public class p089_FutureAmbiguousStateExample {
public static void main(String[] args)
throws InterruptedException, ExecutionException {
Future futureToCancel = p083_FutureHelper.getFuture();
futureToCancel.cancel(true);
assert futureToCancel.isDone();
Future futureWithException = p083_FutureHelper.getFutureWithException();
Exception exception = null;
try {
futureWithException.get();
} catch (ExecutionException e) {
exception = e;
}
// 완료된건지, 에러가 발생한건지 구분하기 어렵다.
assert futureWithException.isDone();
assert exception != null;
}
}
CompletionStage 인터페이스
public interface CompletionStage<T> {
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn,
Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,
Executor executor);
public <U,V> CompletionStage<V> thenCombine
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth
(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync
(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync
(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action,
Executor executor);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
public <U> CompletionStage<U> applyToEither
(CompletionStage<? extends T> other,
Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync
(CompletionStage<? extends T> other,
Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync
(CompletionStage<? extends T> other,
Function<? super T, U> fn,
Executor executor);
public CompletionStage<Void> acceptEither
(CompletionStage<? extends T> other,
Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync
(CompletionStage<? extends T> other,
Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync
(CompletionStage<? extends T> other,
Consumer<? super T> action,
Executor executor);
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
public CompletionStage<Void> runAfterEitherAsync
(CompletionStage<?> other,
Runnable action);
public CompletionStage<Void> runAfterEitherAsync
(CompletionStage<?> other,
Runnable action,
Executor executor);
public <U> CompletionStage<U> thenCompose
(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync
(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync
(Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor);
public <U> CompletionStage<U> handle
(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync
(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete
(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync
(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync
(BiConsumer<? super T, ? super Throwable> action,
Executor executor);
public CompletionStage<T> exceptionally
(Function<Throwable, ? extends T> fn);
public CompletableFuture<T> toCompletableFuture();
}
아래 예제코드에 사용되는 Helper.java
package com.example04.completablefuture.completionstage;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@Slf4j
public class p099_Helper {
@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
var future = CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
Thread.sleep(100);
return future;
}
public static CompletionStage<Integer> completionStage() {
return CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
}
public static CompletionStage<Integer> runningStage() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
log.info("I'm running!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
}
public static CompletionStage<Integer> completionStageAfter1s() {
return CompletableFuture.supplyAsync(() -> {
log.info("getCompletionStage");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
}
public static CompletionStage<Integer> addOne(int value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value + 1;
});
}
public static CompletionStage<String> addResultPrefix(int value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "result: " + value;
});
}
public static CompletableFuture<Integer> waitAndReturn(int millis, int value) {
return CompletableFuture.supplyAsync(() -> {
try {
log.info("waitAndReturn: {}ms", millis);
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value;
});
}
}
//public interface Future<V> {
// boolean cancel(boolean mayInterruptIfRunning);
// boolean isCancelled();
// boolean isDone();
// V get() throws InterruptedException, ExecutionException;
// V get(long timeout, TimeUnit unit)
// throws InterruptedException, ExecutionException, TimeoutException;
//}
CompletionStage 연산자 조합
@Slf4j
public class p092_CompletionStageExample {
public static void main(String[] args) throws InterruptedException {
// chaining
p099_Helper.completionStage()
.thenApplyAsync(value -> {
log.info("thenApplyAsync: {}", value);
return value + 1;
}).thenAccept(value -> {
log.info("thenAccept: {}", value);
}).thenRunAsync(() -> {
log.info("thenRun");
}).exceptionally(e -> {
log.info("exceptionally: {}", e.getMessage());
return null;
});
Thread.sleep(100);
}
}
- 50개에 가까운 연산자들을 활용하여 비동기 task들을 실행하고 값을 변형하는 등 chaining을 이용한 조합이 가능하다.
- 에러를 처리하기 위한 콜백을 제공한다.
ForkJoinPool - thread pool
- CompletableFuture는 내부적으로 비동기 함수들을 실행하기 위해 ForkJoinPool을 사용한다.
- ForkJoinPool의 기본 size = 할당된 cpu 코어 - 1
- 데몬 쓰레드임으로 main 쓰레드가 종료되면 즉각적으로 종료된다.
ForkJoinPool - fork & join
Task를 fork를 통해서 subtask로 나누고, Thread pool에서 steal work 알고리즘을 이용해서 균등하게 처리해서 join을 통해서 결과를 생성한다.
▶ Steal work 알고리즘
병렬 처리를 위해 작업을 잘게 나누어 각각을 처리하는 별개 쓰레드를 만든다면, 쓰레드를 만들고 제거하는 작업 자체가 큰 오버헤드일 수 있다.
일정 개수의 쓰레드와 병렬처리를 위해 전체 작업 목록을 관리하는 작업 큐를 사용한다면, 이 작업 큐에서 task를 꺼내 병렬로 처리할때 작업 큐에 접근하는것 자체가 경쟁이므로 성능 저하가 발생할 수 있다.
일정한 개수의 쓰레드를 유지하고, 쓰레드마다 독립적인 작업큐를 관리하여, 하나의 쓰레드 큐가 비게되면 다른 쓰레드에서 task를 훔쳐올 수 있게하여 효율적으로 동작할 수 있다.
작업을 잘게 나눌 수 있을 때까지 split하고, 작업 큐에 있는 tail task를 다른 쓰레드가 나누어 병렬처리 한 후, join하여 합산하는 방식이다.
CompletionStage 연산자와 연결
thenAccept(), thenAcceptAsync()
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor);
- Consumer를 파라미터로 받는다.
- 이전 task로부터 값을 받지만 값을 넘기지 않는다.
- 다음 task에게 null이 전달된다.
- 값을 받아서 action만 수행하는 경우 유용하다.
done 상태의 thenAccept()
done 상태에서 thenAccept는 caller(main)의 쓰레드에서 실행한다.
@Slf4j
public class p100_CompletionStageThenAcceptExample {
public static void main(String[] args)
throws InterruptedException {
log.info("start main");
// done 상태일때 thenAccept를 호출한 caller 쓰레드에서 action 실행
CompletionStage<Integer> stage = p099_Helper.finishedStage();
stage.thenAccept(i -> {
log.info("{} in thenAccept", i);
}).thenAccept(i -> {
log.info("{} in thenAccept2", i);
});
log.info("after thenAccept");
Thread.sleep(100);
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
var future = CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
Thread.sleep(100);
return future;
}
...
}
실행결과
10:23:58.064 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - start main
10:23:58.069 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
10:23:58.206 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - 1 in thenAccept
10:23:58.207 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - null in thenAccept2
10:23:58.207 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - after thenAccept
done 상태의 thenAcceptAsync()
done 상태의 thenAcceptAsync()를 사용하면 별도의 스레드에서 실행됨을 확인할 수 있다.
@Slf4j
public class p100_CompletionStageThenAcceptAsyncExample {
public static void main(String[] args)
throws InterruptedException {
log.info("start main");
// done 상태일때 thread pool에 있는 쓰레드에서 action 실행
CompletionStage<Integer> stage = p099_Helper.finishedStage();
stage.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync2", i);
});
log.info("after thenAccept");
Thread.sleep(100);
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
var future = CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
Thread.sleep(100);
return future;
}
...
}
실행결과
10:26:33.829 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - start main
10:26:33.834 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
10:26:33.996 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - after thenAccept
10:26:33.997 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - 1 in thenAcceptAsync
10:26:34.000 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - null in thenAcceptAsync2
thenAccept[Async]의 실행쓰레드
- done 상태에서 thenAccept는 caller(main)의 쓰레드에서 실행한다.
- done 상태의 completionStage에 thenAccept를 사용하는 경우, caller 쓰레드를 block 할 수 있다.
running 상태의 thenAccept()
done 상태가 아닌 thenAccept는 callee(forkJoinPool)의 쓰레드에서 실행한다.
@Slf4j
public class p102_CompletionStageThenAcceptRunningExample {
public static void main(String[] args)
throws InterruptedException {
log.info("start main");
// thenAccept가 호출된 callee 쓰레드에서 action 실행
CompletionStage<Integer> stage = p099_Helper.runningStage();
stage.thenAccept(i -> {
log.info("{} in thenAccept", i);
}).thenAccept(i -> {
log.info("{} in thenAccept2", i);
});
Thread.sleep(2000);
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
...
public static CompletionStage<Integer> runningStage() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
log.info("I'm running!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
}
...
}
실행결과
10:50:22.587 [main] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptRunningExample - start main
10:50:23.637 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - I'm running!
10:50:23.638 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptRunningExample - 1 in thenAccept
10:50:23.640 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptRunningExample - null in thenAccept2
running 상태의 thenAcceptAsync()
@Slf4j
public class p102_CompletionStageThenAcceptAsyncRunningExample {
public static void main(String[] args)
throws InterruptedException {
log.info("start main");
// thread pool에 있는 쓰레드에서 action 실행
CompletionStage<Integer> stage = p099_Helper.runningStage();
stage.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
});
Thread.sleep(2000);
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
...
public static CompletionStage<Integer> runningStage() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
log.info("I'm running!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
}
...
}
실행결과
10:49:23.168 [main] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptAsyncRunningExample - start main
10:49:24.199 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - I'm running!
10:49:24.202 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptAsyncRunningExample - 1 in thenAcceptAsync
10:49:24.204 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptAsyncRunningExample - null in thenAcceptAsync
thenAccept[Async]의 실행쓰레드
- done 상태가 아닌 thenAccept는 callee(forkJoinPool)의 쓰레드에서 실행한다.
- done 상태가 아닌 completionStage에 thenAccept를 사용하는 경우, callee를 block 할 수 있다.
then*[Async]의 실행 스레드
then*Async의 쓰레드풀 변경
- 모든 then*Async 연산자는 executor를 추가 인자로 받는다.
- 이를 통해서 다른 쓰레드풀로 task를 실행할 수 있다.
@Slf4j
public class p105_CompletionStageThenAcceptAsyncExecutorExample {
public static void main(String[] args)
throws InterruptedException {
var single = Executors.newSingleThreadExecutor();
var fixed = Executors.newFixedThreadPool(10);
log.info("start main");
CompletionStage<Integer> stage = p099_Helper.completionStage();
stage.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}, fixed).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync2", i);
}, single);
log.info("after thenAccept");
Thread.sleep(200);
single.shutdown();
fixed.shutdown();
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
...
public static CompletionStage<Integer> completionStage() {
return CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
}
...
}
실행결과
12:44:13.435 [main] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - start main
12:44:13.440 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
12:44:13.441 [main] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - after thenAccept
12:44:13.440 [pool-2-thread-1] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - 1 in thenAcceptAsync
12:44:13.442 [pool-1-thread-1] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - null in thenAcceptAsync2
thenApply[Async]
- Function을 파라미터로 받는다.
- 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 값을 반환한다.
- 다음 task에게 반환했던 값이 전달된다.
- 값을 변형해서 전달해야 하는 경우 유용하다.
@Slf4j
public class p107_CompletionStageThenApplyAsyncExample {
public static void main(String[] args)
throws InterruptedException {
CompletionStage<Integer> stage = p099_Helper.completionStage();
stage.thenApplyAsync(value -> {
var next = value + 1;
log.info("in thenApplyAsync: {}", next);
return next; // add 1
}).thenApplyAsync(value -> {
var next = "result: " + value;
log.info("in thenApplyAsync2: {}", next);
return next;
}).thenApplyAsync(value -> {
var next = value.equals("result: 2");
log.info("in thenApplyAsync3: {}", next);
return next;
}).thenAcceptAsync(value -> log.info("{}", value));
Thread.sleep(100);
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
...
public static CompletionStage<Integer> completionStage() {
return CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
}
...
}
실행결과
13:35:28.079 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:35:28.082 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - in thenApplyAsync: 2
13:35:28.102 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - in thenApplyAsync2: result: 2
13:35:28.103 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - in thenApplyAsync3: true
13:35:28.103 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - true
thenCompose[Async]
- Function을 파라미터로 받는다.
- 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 CompletionStage를 반환한다.
- 반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다.
- 다른 future를 반환해야하는 경우 유용하다.
@Slf4j
public class p109_CompletionStageThenComposeAsyncExample {
public static void main(String[] args)
throws InterruptedException {
log.info("start main");
CompletionStage<Integer> stage = p099_Helper.completionStage();
stage.thenComposeAsync(value -> {
var next = p099_Helper.addOne(value);
log.info("in thenComposeAsync: {}", next);
return next;
}).thenComposeAsync(value -> {
var next = p099_Helper.addResultPrefix(value);
log.info("in thenComposeAsync2: {}", next);
return next;
}).thenAcceptAsync(value -> {
log.info("{} in thenAcceptAsync", value);
});
Thread.sleep(1000);
log.info("end main");
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
...
public static CompletionStage<Integer> completionStage() {
return CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
}
public static CompletionStage<Integer> addOne(int value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value + 1;
});
}
public static CompletionStage<String> addResultPrefix(int value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "result: " + value;
});
}
...
}
실행결과
13:44:24.224 [main] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - start main
13:44:24.229 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:44:24.231 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - in thenComposeAsync: java.util.concurrent.CompletableFuture@1a026754[Not completed]
13:44:24.362 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - in thenComposeAsync2: java.util.concurrent.CompletableFuture@63ed7ac7[Not completed]
13:44:24.502 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - result: 2 in thenAcceptAsync
13:44:25.306 [main] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - end main
thenRun[Async]
- Runnable을 파라미터로 받는다.
- 이전 task로부터 값을 받지 않고 값을 반환하지 않는다.
- 다음 task에게 null이 전달된다.
- future가 완료되었다는 이벤트를 기록할 때 유용하다.
@Slf4j
public class p110_CompletionStageThenRunAsyncExample {
public static void main(String[] args)
throws InterruptedException {
log.info("start main");
CompletionStage<Integer> stage = p099_Helper.completionStage();
stage.thenRunAsync(() -> {
log.info("in thenRunAsync");
}).thenRunAsync(() -> {
log.info("in thenRunAsync2");
}).thenAcceptAsync(value -> {
log.info("{} in thenAcceptAsync", value);
});
Thread.sleep(100);
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
...
public static CompletionStage<Integer> completionStage() {
return CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
}
...
}
실행결과
13:48:04.847 [main] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - start main
13:48:04.852 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:48:04.853 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - in thenRunAsync
13:48:04.853 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - in thenRunAsync2
13:48:04.853 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - null in thenAcceptAsync
exceptionally
- Function을 파라미터로 받는다.
- 이전 task에서 발생한 exception을 받아서 처리하고 값을 반환한다.
- 다음 task에게 반환된 값을 전달한다.
- future 파이프에서 발생한 에러를 처리할때 유용하다.
@Slf4j
public class p113_CompletionStageExceptionallyExample {
public static void main(String[] args)
throws InterruptedException {
p099_Helper.completionStage()
.thenApplyAsync(i -> {
log.info("in thenApplyAsync");
return i / 0;
}).exceptionally(e -> {
log.info("{} in exceptionally", e.getMessage());
return 0;
}).thenAcceptAsync(value -> {
log.info("{} in thenAcceptAsync", value);
});
Thread.sleep(1000);
}
}
// p099_Helper.java
@Slf4j
public class p099_Helper {
...
public static CompletionStage<Integer> completionStage() {
return CompletableFuture.supplyAsync(() -> {
log.info("return in future");
return 1;
});
}
...
}
실행결과
13:54:11.069 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:54:11.074 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p113_CompletionStageExceptionallyExample - in thenApplyAsync
13:54:11.074 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p113_CompletionStageExceptionallyExample - java.lang.ArithmeticException: / by zero in exceptionally
13:54:11.075 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p113_CompletionStageExceptionallyExample - 0 in thenAcceptAsync
CompletableFuture 클래스
위에서 본 CompletionStage 인터페이스를 구현한 CompletableFuture 클래스다. CompletionStage는 계산이 완료되면 결과를 설정할 수 있는 메서드는 존재하지 않았는데, CompletableFuture 클래스는 complete(), completeExceptionally() 등의 메서드를 사용하여 계산의 결과나 예외를 수동으로 설정할 수 있다. CompletableFuture 클래스는 완료된 결과의 다양한 조작을 수행할 수 있는 다양한 메서드를 제공하고있다.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
}
supplyAsync()
- Supplier를 제공하여 CompletableFuture를 생성 가능하다.
- Supplier의 반환값이 CompletableFuture의 결과다.
@Slf4j
public class p118_CompletableFutureSupplyAsyncExample {
public static void main(String[] args)
throws ExecutionException, InterruptedException {
log.info("start main");
var future = CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
assert !future.isDone();
Thread.sleep(1000);
assert future.isDone();
assert future.get() == 1;
log.info("end main");
}
}
실행결과
14:16:34.800 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureSupplyAsyncExample - start main
14:16:34.805 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureSupplyAsyncExample - supplyAsync
14:16:35.877 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureSupplyAsyncExample - end main
runAsync()
- Runnable를 제공하여 CompletableFuture 를 생성할 수 있다.
- 값을 반환하지 않는다.
- 다음 task에 null이 전달된다.
@Slf4j
public class p118_CompletableFutureRunAsyncExample {
public static void main(String[] args)
throws ExecutionException, InterruptedException {
log.info("start main");
var future = CompletableFuture.runAsync(() -> {
log.info("runAsync");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assert !future.isDone();
Thread.sleep(1000);
assert future.isDone();
assert future.get() == null;
log.info("end main");
}
}
실행결과
14:16:02.678 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureRunAsyncExample - start main
14:16:02.683 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureRunAsyncExample - runAsync
14:16:03.759 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureRunAsyncExample - end main
complete()
CompletableFuture가 완료되지 않았다면 주어진 값으로 채운다.
complete에 의해서 상태가 바뀌었다면 true, 아니라면 false를 반환한다.
@Slf4j
public class p120_CompletableFutureCompleteExample {
public static void main(String[] args)
throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = new CompletableFuture<>();
assert !future.isDone();
// 완료되지 않았다면 주어진 값으로 채운다.
var triggered = future.complete(1);
assert future.isDone();
assert triggered;
assert future.get() == 1;
triggered = future.complete(2);
assert future.isDone();
assert !triggered;
assert future.get() == 1;
}
}
CompletableFuture 상태
isCompletedExceptionally()
- Exception에 의해서 complete 되었는지 확인 할 수 있다.
@Slf4j
public class p121_CompletableFutureIsCompletedExceptionallyExample {
public static void main(String[] args)
throws InterruptedException {
var futureWithException = CompletableFuture.supplyAsync(() -> {
return 1 / 0;
});
Thread.sleep(100);
assert futureWithException.isDone();
// exception에 의해서 complete 되었는가?
assert futureWithException.isCompletedExceptionally();
}
}
allOf()
- 여러 completableFuture를 모아서 하나의 completableFuture로 변환할 수 있다.
- 모든 completableFuture가 완료되면 상태가 done으로 변경한다.
- Void를 반환하므로 각각의 값에 get으로 접근해야 한다.
@Slf4j
public class p123_CompletableFutureAllOfExample {
public static void main(String[] args)
throws InterruptedException {
var startTime = System.currentTimeMillis();
var firstFuture = p099_Helper.waitAndReturn(100, 1);
var secondFuture = p099_Helper.waitAndReturn(500, 2);
var thirdFuture = p099_Helper.waitAndReturn(1000, 3);
CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
.thenAcceptAsync(v -> {
log.info("after allOf");
try {
log.info("first: {}", firstFuture.get());
log.info("second: {}", secondFuture.get());
log.info("third: {}", thirdFuture.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}).join();
var endTime = System.currentTimeMillis();
log.info("elapsed: {}ms", endTime - startTime);
}
}
실행결과
14:22:31.615 [ForkJoinPool.commonPool-worker-9] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 1000ms
14:22:31.615 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 100ms
14:22:31.615 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 500ms
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - after allOf
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - first: 1
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - second: 2
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - third: 3
14:22:32.627 [main] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - elapsed: 1019ms
anyOf()
- 여러 completableFuture를 모아서 하나의 completableFuture로 변환할 수 있다.
- 주어진 future 중 하나라도 완료되면 상태가 done으로 변경한다.
- 제일 먼저 done 상태가 되는 future의 값을 반환한다.
@Slf4j
public class p125_CompletableFutureAnyOfExample {
public static void main(String[] args)
throws InterruptedException {
var startTime = System.currentTimeMillis();
var firstFuture = p099_Helper.waitAndReturn(100, 1);
var secondFuture = p099_Helper.waitAndReturn(500, 2);
var thirdFuture = p099_Helper.waitAndReturn(1000, 3);
CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
.thenAcceptAsync(v -> {
log.info("after anyOf");
log.info("first value: {}", v);
}).join();
var endTime = System.currentTimeMillis();
log.info("elapsed: {}ms", endTime - startTime);
}
}
실행결과
14:23:18.467 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 500ms
14:23:18.467 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 100ms
14:23:18.467 [ForkJoinPool.commonPool-worker-23] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 1000ms
14:23:18.644 [ForkJoinPool.commonPool-worker-9] INFO com.example04.completablefuture.completablefuture.p125_CompletableFutureAnyOfExample - after anyOf
14:23:18.644 [ForkJoinPool.commonPool-worker-9] INFO com.example04.completablefuture.completablefuture.p125_CompletableFutureAnyOfExample - first value: 1
14:23:18.644 [main] INFO com.example04.completablefuture.completablefuture.p125_CompletableFutureAnyOfExample - elapsed: 184ms
CompletableFuture의 한계
- 지연 로딩 기능을 제공하지 않는다.
- CompletableFuture를 반환하는 함수를 호출시 즉시 작업이 실행된다.
- 지속적으로 생성되는 데이터를 처리하기 어렵다.
- CompletableFuture에서 데이터를 반환하고 나면 다시 다른 값을 전달하기 어렵다.
Refernece
강의; Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
https://medium.com/@kimkare/java-work-stealing-fork-join-f7c18e8ffa1a
'Coding > Java' 카테고리의 다른 글
[Java] 람다 INVOKEDYNAMIC의 내부 동작에 대한 이해 (0) | 2023.04.18 |
---|---|
플라이웨이트 패턴 (Flyweight Pattern) (0) | 2023.04.02 |
wait()과 notify(), notifyAll() (0) | 2022.08.17 |
[JAVA] ThreadLocal (0) | 2022.08.04 |
[JAVA] Volatile 변수 (0) | 2022.08.03 |