CompletableFuture
Java5에 등장한 Future의 단점을 보완한 java8의 클래스다. Future은 get()을 호출하여 작업을 완료하고, get()은 비동기 작업의 결과를 가져올때까지 블로킹 상태가 된다. 여러 Future을 조합할 수 없고, 예외 처리도 불가능하여 이를 보완한 CompletableFuture을 사용하여 비동기 작업을 직접 완료할 수 있다.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
CompletableFuture는 CompletionStage 인터페이스를 구현한다. CompletionStage는 비동기 연산을 체이닝으로 연속해서 작업들을 중첩시킬 수 있다.
public interface CompletionStage<T> {
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn)
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn,
Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
...
}
1) 작업 완료 상태로 생성
CompletableFuture<Integer> f = CompletableFuture.completedFuture(1);
System.out.println(f.get()); // 1
completedFuture()
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
2) value 설정을 하지 않은 경우 무한대기
CompletableFuture 객체의 get() 메서드를 호출하면 결과를 받아올 때까지 블로킹된다. 이때 최대 블로킹 시간을 설정할 수 있는데, 설정하지 않는다면 결과를 받아올때까지 무한 대기한다.
CompletableFuture<Integer> f2 = new CompletableFuture<>();
// System.out.println(f2.get()); // 무한 대기
3) 객체 생성 후 완료 처리
CompletableFuture<Integer> f2 = new CompletableFuture<>();
// System.out.println(f2.get()); // 무한 대기
f2.complete(2); // 완료 처리
System.out.println(f2.get()); // 2
complete()
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
// completeValue()
final boolean completeValue(T t) {
return RESULT.compareAndSet(this, null, (t == null) ? NIL : t);
}
4) 명시적 예외 설정
// 예외 명시적
// 예외 정보를 f2가 담고있을 뿐, 별도로 에러가 발생하진 않는다.
f2.completeExceptionally(new RuntimeException());
예외가 발생하는 순간
System.out.println(f2.get()); // 블로킹 - get()을 하는 순간, 예외가 발생되는것
다양한 메서드 제공
1) runAsync()
Runnable 타입을 파라미터로 전달하여 비동기 작업을 수행한다. 별도의 스레드에서 수행된다.
CompletableFuture.runAsync(() -> log.info("runAsync"));
log.info("exit");
runAsync()
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
}
▶ 결과
수행 스레드 명 : ForkJoinPool.commonPool-worker-19
스레드명이 ForkJoinPool인데, 이는 기본적으로 ForkJoinPool의 commonPool()을 사용하여 작업을 실행할 쓰레드를 쓰레드 풀로부터 할당 받아 수행한다.
[main] INFO com.reactive.step07.E32_CompletableFuture2 - exit
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E32_CompletableFuture2 - runAsync
메서드 체이닝
CompletableFuture
/** 계속 같은 스레드 */
// .runAsync(() -> log.info("runAsync")) // 수행된 백그라운드 스레드 (결과값을 사용할 수 없다)
.supplyAsync(() -> { // 파라미터는 받지 않고 리턴값은 존재
log.info("supplyAsync");
// 고의 에러 발생
// if (1 == 1) {
// throw new RuntimeException();
// }
return 1;
})
.thenApply(s -> { log.info("thenApply1 {}", s); // 파라미터, 결과값이 존재
return s + 1;
})
.thenCompose(s -> { log.info("thenCompose {}", s); // 파라미터, 결과값이 존재
return CompletableFuture.completedFuture(s + 1);
// CompletableFuture 객체를 리턴하게된다. (그 안에 CompletableFuture)
// thenCompose 로 써야한다.(flatMap과 유사)
// CompletableFuture.completedFuture(s + 1) 로 리턴해야하는 상황이 있을때 사용한다.
})
.thenApply(s2 -> {
log.info("thenApply2 {}", s2);
return s2 * 3;
}) // 위 결과를 받아서 소모만 한다.
.exceptionally(e -> -10) // 위 어디서든 예외가 발생하면 예외 처리하고싶다.
.thenAccept(s3 -> log.info("thenAccept {}", s3))
;
log.info("exit");
1) supplyAsync()
파라미터는 받지 않고 리턴 값이 존재하는 비동기 작업을 수행한다. 비동기 작업의 결과값을 받아올 수 있다.
runAsync()와 마찬가지로, ForkJoinPool의 commonPool()을 사용하여 작업을 실행할 쓰레드를 쓰레드 풀로부터 할당 받아 수행한다.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
// Supplier
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
1을 리턴한다.
.supplyAsync(() -> { // 파라미터는 받지 않고 리턴값은 존재
log.info("supplyAsync");
return 1;
})
2) thenApply()
파라미터와 결과값 모두 존재한다.
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
// Function
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}
1을 더한 값을 리턴한다.
.thenApply(s -> { log.info("thenApply1 {}", s); // 파라미터, 결과값이 존재
return s + 1;
})
3) thenCompose()
파라미터, 결과값이 모두 존재한다.
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
CompletableFuture을 리턴하도록 해보자. (스트림의 flatMap과 비슷하다.)
리턴값 : CompletableFuture.completedFuture(s + 1)
.thenCompose(s -> { log.info("thenCompose {}", s); // 파라미터, 결과값이 존재
return CompletableFuture.completedFuture(s + 1);
})
completedFuture()
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
결국, completedFuture()은 작업을 수행하고, 그 작업 결과에 대한 CompletableFuture를 리턴하는 flatMap과 같은 역할을 한다.
[예제]
출처 : https://gowoonsori.com/java/completablefuture/
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorldFuture(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return message + " World";
});
}
}
/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/
4) thenApply()
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
3)번의 결과를 받아서 소모만 한다. 여기서 말하는 소모는, 받은 값을 다른 값으로 변환하여 리턴한다는 뜻이다.
.thenApply(s2 -> {
log.info("thenApply2 {}", s2);
return s2 * 3;
})
5) exceptionally()
예외가 발생하면 처리한다.
.exceptionally(e -> -10)
6) thenAccept()
결과값을 받아서 처리하는 콜백 함수다. CompletableFuture가 complete가 되면, thenAccept()를 수행한다.
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
// Consumer
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
▶ 최종 결과값을 출력한다.
.thenAccept(s3 -> log.info("thenAccept {}", s3))
코드 수행 결과
위 메서드는 동일한 스레드인 'ForkJoinPool.commonPool-worker-19' 으로 수행되었음을 알 수 있다.
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - supplyAsync
[main] INFO com.reactive.step07.E33_CompletableFuture2 - exit
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenApply1 1
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenCompose 2
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenApply2 3
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenAccept 9
다른 스레드로 수행시키는 방법
10개의 스레드를 다 쓰고, 앞으로 돌아가서 순서대로 재사용하는 방식
ExecutorService es = Executors.newFixedThreadPool(10);
위 es를 각 메서드의 인자에 설정해주면, ForkJoinPool의 commonPool을 사용하지 않고 별도로 정의한 쓰레드 풀을 사용하게된다.
전체 코드
CompletableFuture
.supplyAsync(() -> { // 파라미터는 받지 않고 리턴값은 존재
log.info("supplyAsync");
// 고의 에러 발생
// if (1 == 1) {
// throw new RuntimeException();
// }
return 1;
}, es) /** es 명시 */
/** 다른 스레드로 수행하고 싶다. thenApply -> thenApplyAsync */
.thenApplyAsync(s -> { log.info("thenApply1 {}", s); // 파라미터, 결과값이 존재
return s + 1;
}, es) /** es 명시 */
.thenCompose(s -> { log.info("thenCompose {}", s); // 파라미터, 결과값이 존재
return CompletableFuture.completedFuture(s + 1);
})
.thenApply(s2 -> {
log.info("thenApply2 {}", s2);
return s2 * 3;
})
.exceptionally(e -> -10)
.thenAcceptAsync(s3 -> log.info("thenAccept {}", s3), es) /** es 명시 */
;
log.info("exit");
1) thenApplySync()
.thenApplyAsync(s -> { log.info("thenApply1 {}", s); // 파라미터, 결과값이 존재
return s + 1;
}, es)
2) thenAcceptAsync()
.thenAcceptAsync(s3 -> log.info("thenAccept {}", s3), es) /** es 명시 */
결과
[pool-1-thread-1] INFO com.reactive.step07.E33_CompletableFuture_otherThread - supplyAsync
[main] INFO com.reactive.step07.E34_CompletableFuture_otherThread - exit
[pool-1-thread-2] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenApply1 1
[pool-1-thread-2] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenCompose 2
[pool-1-thread-2] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenApply2 3
[pool-1-thread-3] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenAccept 9