[Spring Reactive Programming] 5. Reactive Streams - CompletableFuture

반응형
728x90
반응형

CompletableFuture

Java5에 등장한 Future의 단점을 보완한 java8의 클래스다. Future은 get()을 호출하여 작업을 완료하고, get()은 비동기 작업의 결과를 가져올때까지 블로킹 상태가 된다. 여러 Future을 조합할 수 없고, 예외 처리도 불가능하여 이를 보완한 CompletableFuture을 사용하여 비동기 작업을 직접 완료할 수 있다.

 

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

CompletableFuture는 CompletionStage 인터페이스를 구현한다. CompletionStage는 비동기 연산을 체이닝으로 연속해서 작업들을 중첩시킬 수 있다. 

 

public interface CompletionStage<T> {
    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn)
    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn,
         Executor executor);
    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    
    ...
}

1) 작업 완료 상태로 생성

CompletableFuture<Integer> f = CompletableFuture.completedFuture(1);
System.out.println(f.get()); // 1

 

completedFuture()
public static <U> CompletableFuture<U> completedFuture(U value) {
    return new CompletableFuture<U>((value == null) ? NIL : value);
}

 

2) value 설정을 하지 않은 경우 무한대기

CompletableFuture 객체의 get() 메서드를 호출하면 결과를 받아올 때까지 블로킹된다. 이때 최대 블로킹 시간을 설정할 수 있는데, 설정하지 않는다면 결과를 받아올때까지 무한 대기한다.

CompletableFuture<Integer> f2 = new CompletableFuture<>();
//        System.out.println(f2.get()); // 무한 대기

 

3) 객체 생성 후 완료 처리

 CompletableFuture<Integer> f2 = new CompletableFuture<>();
//        System.out.println(f2.get()); // 무한 대기

f2.complete(2); // 완료 처리
System.out.println(f2.get()); // 2

 

complete()
public boolean complete(T value) {
    boolean triggered = completeValue(value);
    postComplete();
    return triggered;
}

// completeValue()
final boolean completeValue(T t) {
    return RESULT.compareAndSet(this, null, (t == null) ? NIL : t);
}

 

4) 명시적 예외 설정

// 예외 명시적
// 예외 정보를 f2가 담고있을 뿐, 별도로 에러가 발생하진 않는다.
f2.completeExceptionally(new RuntimeException());

 

예외가 발생하는 순간
System.out.println(f2.get()); // 블로킹 - get()을 하는 순간, 예외가 발생되는것

 

 

다양한 메서드 제공

1) runAsync()

Runnable 타입을 파라미터로 전달하여 비동기 작업을 수행한다. 별도의 스레드에서 수행된다.

CompletableFuture.runAsync(() -> log.info("runAsync"));
log.info("exit");

 

runAsync()
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(ASYNC_POOL, runnable);
}

 

▶ 결과

수행 스레드 명 : ForkJoinPool.commonPool-worker-19

스레드명이 ForkJoinPool인데, 이는 기본적으로 ForkJoinPool의 commonPool()을 사용하여 작업을 실행할 쓰레드를 쓰레드 풀로부터 할당 받아 수행한다.

[main] INFO com.reactive.step07.E32_CompletableFuture2 - exit
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E32_CompletableFuture2 - runAsync

 

 

메서드 체이닝

CompletableFuture
            /** 계속 같은 스레드 */
//                .runAsync(() -> log.info("runAsync")) // 수행된 백그라운드 스레드 (결과값을 사용할 수 없다)
            .supplyAsync(() -> { // 파라미터는 받지 않고 리턴값은 존재
                log.info("supplyAsync");
//                    고의 에러 발생
//                    if (1 == 1) {
//                        throw new RuntimeException();
//                    }

                return 1;
            })
            .thenApply(s -> { log.info("thenApply1 {}", s);  // 파라미터, 결과값이 존재
                return s + 1;
            })
            .thenCompose(s -> { log.info("thenCompose {}", s);  // 파라미터, 결과값이 존재
                return CompletableFuture.completedFuture(s + 1);
                // CompletableFuture 객체를 리턴하게된다. (그 안에 CompletableFuture)
                // thenCompose 로 써야한다.(flatMap과 유사)
                // CompletableFuture.completedFuture(s + 1) 로 리턴해야하는 상황이 있을때 사용한다.
            })
            .thenApply(s2 -> {
                log.info("thenApply2 {}", s2);
                return s2 * 3;
            }) // 위 결과를 받아서 소모만 한다.
            .exceptionally(e -> -10) // 위 어디서든 예외가 발생하면 예외 처리하고싶다.
            .thenAccept(s3 -> log.info("thenAccept {}", s3))
    ;

    log.info("exit");

1) supplyAsync()

파라미터는 받지 않고 리턴 값이 존재하는 비동기 작업을 수행한다. 비동기 작업의 결과값을 받아올 수 있다.

runAsync()와 마찬가지로, ForkJoinPool의 commonPool()을 사용하여 작업을 실행할 쓰레드를 쓰레드 풀로부터 할당 받아 수행한다.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(ASYNC_POOL, supplier);
}

// Supplier
@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

 

1을 리턴한다.
.supplyAsync(() -> { // 파라미터는 받지 않고 리턴값은 존재
                log.info("supplyAsync");
                return 1;
            })

 

2) thenApply()

파라미터와 결과값 모두 존재한다.

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

// Function
@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

 

1을 더한 값을 리턴한다.
.thenApply(s -> { log.info("thenApply1 {}", s);  // 파라미터, 결과값이 존재
    return s + 1;
})

 

3) thenCompose()

파라미터, 결과값이 모두 존재한다.

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

 

CompletableFuture을 리턴하도록 해보자. (스트림의 flatMap과 비슷하다.)

리턴값 : CompletableFuture.completedFuture(s + 1)
.thenCompose(s -> { log.info("thenCompose {}", s);  // 파라미터, 결과값이 존재
    return CompletableFuture.completedFuture(s + 1);
})

 

completedFuture()
public static <U> CompletableFuture<U> completedFuture(U value) {
    return new CompletableFuture<U>((value == null) ? NIL : value);
}

결국, completedFuture()은 작업을 수행하고, 그 작업 결과에 대한 CompletableFuture를 리턴하는 flatMap과 같은 역할을 한다.

 

[예제]

출처 : https://gowoonsori.com/java/completablefuture/

public class App {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
        System.out.println(future.get());

    }
    private static CompletableFuture<String> getWorldFuture(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/

 

4) thenApply()

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

3)번의 결과를 받아서 소모만 한다. 여기서 말하는 소모는, 받은 값을 다른 값으로 변환하여 리턴한다는 뜻이다.

.thenApply(s2 -> {
    log.info("thenApply2 {}", s2);
    return s2 * 3;
})

 

5) exceptionally()

예외가 발생하면 처리한다.

.exceptionally(e -> -10)

 

6) thenAccept()

결과값을 받아서 처리하는 콜백 함수다. CompletableFuture가 complete가 되면, thenAccept()를 수행한다.

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

// Consumer
@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

 

▶ 최종 결과값을 출력한다.

.thenAccept(s3 -> log.info("thenAccept {}", s3))

 

 

코드 수행 결과

위 메서드는 동일한 스레드인 'ForkJoinPool.commonPool-worker-19' 으로 수행되었음을 알 수 있다.

[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - supplyAsync
[main] INFO com.reactive.step07.E33_CompletableFuture2 - exit
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenApply1 1
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenCompose 2
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenApply2 3
[ForkJoinPool.commonPool-worker-19] INFO com.reactive.step07.E33_CompletableFuture2 - thenAccept 9

 

 

다른 스레드로 수행시키는 방법

10개의 스레드를 다 쓰고, 앞으로 돌아가서 순서대로 재사용하는 방식

ExecutorService es = Executors.newFixedThreadPool(10);

위 es를 각 메서드의 인자에 설정해주면, ForkJoinPool의 commonPool을 사용하지 않고 별도로 정의한 쓰레드 풀을 사용하게된다.

 

전체 코드
CompletableFuture
                .supplyAsync(() -> { // 파라미터는 받지 않고 리턴값은 존재
                    log.info("supplyAsync");
//                    고의 에러 발생
//                    if (1 == 1) {
//                        throw new RuntimeException();
//                    }

                    return 1;
                }, es)  /** es 명시 */
                /** 다른 스레드로 수행하고 싶다. thenApply -> thenApplyAsync */
                .thenApplyAsync(s -> { log.info("thenApply1 {}", s);  // 파라미터, 결과값이 존재
                    return s + 1;
                }, es) /** es 명시 */
                .thenCompose(s -> { log.info("thenCompose {}", s);  // 파라미터, 결과값이 존재
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApply(s2 -> {
                    log.info("thenApply2 {}", s2);
                    return s2 * 3;
                })
                .exceptionally(e -> -10)
                .thenAcceptAsync(s3 -> log.info("thenAccept {}", s3), es)  /** es 명시 */
        ;

        log.info("exit");

1) thenApplySync()

.thenApplyAsync(s -> { log.info("thenApply1 {}", s);  // 파라미터, 결과값이 존재
    return s + 1;
}, es)

 

2) thenAcceptAsync()

.thenAcceptAsync(s3 -> log.info("thenAccept {}", s3), es)  /** es 명시 */

 

결과
[pool-1-thread-1] INFO com.reactive.step07.E33_CompletableFuture_otherThread - supplyAsync
[main] INFO com.reactive.step07.E34_CompletableFuture_otherThread - exit
[pool-1-thread-2] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenApply1 1
[pool-1-thread-2] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenCompose 2
[pool-1-thread-2] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenApply2 3
[pool-1-thread-3] INFO com.reactive.step07.E33_CompletableFuture_otherThread - thenAccept 9

 

 

반응형

Designed by JB FACTORY