[Java] CompletableFuture 클래스 (Future, CompletionStage)

반응형
728x90
반응형

CompletableFuture 클래스

CompletableFuture 클래스는 java8에 도입된 Future, CompletionStage 인터페이스의 구현체다.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    ...
}

 

Future

  • 비동기적인 작업을 수행
  • 해당 작업이 완료되면 결과를 반환하는 인터페이스
  • java5부터 java.util.concurrency package에서 비동기의 결과값을 받는 용도로 사용

 

CompletionStage

  • 비동기적인 작업을 수행
  • 해당 작업이 완료되면 결과를 처리하거나 다른 CompletionStage를 연결 하는 인터페이스
  • 쉽게 말하자면, 하나의 비동기 작업을 수행하고 완료가 되었을 때, 여기에 또다른 작업을 수행할 수 있도록 메서드를 제공하는 인터페이스
  • java8에서 CompletableFuturue 클래스와 함께 제공

 

 

Future 인터페이스

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

 

 

ExecutorService

  • 쓰레드 풀을 이용하여 비동기적으로 작업을 실행하고 관리한다.
  • 별도의 쓰레드를 생성하고 관리하지 않아도 되므로, 코드를 간결하게 유지 가능하다.
  • 쓰레드 풀을 이용하여 자원을 효율적으로 관리한다.
public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
        
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

 

1) execute

Runnable 인터페이스를 구현한 작업을 쓰레드 풀에서 비동기적으로 실행한다.

 

2) submit

Callable 인터페이스를 구현한 작업 을 쓰레드 풀에서 비동기적으로 실행하고, 해당 작업의 결과를 Future 객체로 반환한다.

 

3) shutdown

ExecutorService를 종료. 더 이상 task를 받지 않는다.

 

 

Executors.java - ExecutorService 생성

▶ 각 메서드별 예제코드 등 자세한 정보는 이전 포스팅을 참고하자!

https://devfunny.tistory.com/807

 

[JAVA8 병렬프로그래밍] Executors 클래스, ExecutorService 인터페이스

Executors 클래스 - Executor 인터페이스 : 컨커런트 API의 핵심 인터페이스다. 이 인터페이스를 구현한 여러 종류의 클래스를 기본으로 제공한다. - 스레드 풀 : 스레드를 관리하기 위한 풀이다. 병렬

devfunny.tistory.com

 

1) newSingleThreadExecutor

  • 단일 쓰레드로 구성된 스레드 풀을 생성한다.
  • 한 번에 하나의 작업만 실행한다.

 

2) newFixedThreadPool

  • 고정된 크기의 쓰레드 풀을 생성한다.
  • 크기는 인자로 주어진 n과 동일하다.

 

3) newCachedThreadPool

  • 사용 가능한 쓰레드가 없다면 새로 생성해서 작업을 처리하고, 있다면 재사용한다.
  • 쓰레드가 일정 시간 사용되지 않으면 회수한다.

 

4) newScheduledThreadPool

  • 스케줄링 기능을 갖춘 고정 크기의 쓰레드 풀을 생성한다.
  • 주기적이거나 지연이 발생하는 작업을 실행한다.

 

5) newWorkStealingPool

  • work steal 알고리즘을 사용하는 ForkJoinPool을 생성한다.

 

 

Future의 isDone(), isCancelled()

isDone()

  • task가 완료되었다면, 원인과 상관없이 true 반환

 

isCancelled()

  • task가 명시적으로 취소된 경우, true 반환

 

 

Future의 get()

public class p085_FutureGetExample {
    public static void main(String[] args)
            throws InterruptedException, ExecutionException {
        Future future = p083_FutureHelper.getFuture();
        assert !future.isDone();
        assert !future.isCancelled();

        var result = future.get();
        assert result.equals(1);
        assert future.isDone();
        assert !future.isCancelled();
    }
}
  • 결과를 구할 때까지 thread가 계속 block된다.
  • future에서 무한 루프나 오랜 시간이 걸린다면 thread가 blocking 상태를 유지한다.

 

 

Future의 get(long timeout, TimeUnit unit)

public class p086_FutureGetTimeoutExample {
    public static void main(String[] args)
            throws InterruptedException, ExecutionException, TimeoutException {
        Future future = p083_FutureHelper.getFutureCompleteAfter1s();
        // get() ; 비동기 처리가 어려움
        var result = future.get(1500, TimeUnit.MILLISECONDS);
        assert result.equals(1);

        Future futureToTimeout = p083_FutureHelper.getFutureCompleteAfter1s();
        Exception exception = null;
        try {
            futureToTimeout.get(500, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            exception = e;
        }
        assert exception != null;
    }
}
  • 결과를 구할 때까지 timeout동안 thread가 block된다.
  • timeout이 넘어가도 응답이 반환되지 않으면 TimeoutException 발생한다.

 

 

Future의 cancel(boolean mayInterruptIfRunning)

public class p087_FutureCancelExample {
    public static void main(String[] args) {
        Future future = p083_FutureHelper.getFuture();
        // 취소할 수 없는 상황이라면 false 리턴
        // mayInterruptIfRunning이 false 라면 시작하지 않은 작업에 대해서만 취소
        var successToCancel = future.cancel(true);
        assert future.isCancelled();
        assert future.isDone();
        assert successToCancel;

        successToCancel = future.cancel(true);
        assert future.isCancelled();
        assert future.isDone();
        assert !successToCancel;
    }
}
  • future의 작업 실행을 취소한다.
  • 취소할 수 없는 상황이라면 false를 반환한다.
  • mayInterruptIfRunning가 false라면 시작하지 않은 작업에 대해서만 취소한다.

 

Future 인터페이스의 한계

  • cancel을 제외하고 외부에서 future를 컨트롤 할 수 없다.
  • 반환된 결과를 get() 해서 접근하기 때문에 비동기 처리가 어렵다.
  • 완료되거나 에러가 발생했는지 구분하기 어렵다.
public class p089_FutureAmbiguousStateExample {
    public static void main(String[] args)
            throws InterruptedException, ExecutionException {
        Future futureToCancel = p083_FutureHelper.getFuture();
        futureToCancel.cancel(true);
        assert futureToCancel.isDone();

        Future futureWithException = p083_FutureHelper.getFutureWithException();

        Exception exception = null;
        try {
            futureWithException.get();
        } catch (ExecutionException e) {
            exception = e;
        }

        // 완료된건지, 에러가 발생한건지 구분하기 어렵다.
        assert futureWithException.isDone();
        assert exception != null;
    }
}

 

 

CompletionStage 인터페이스

public interface CompletionStage<T> {
    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn,
         Executor executor);

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);

    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
                                                 Executor executor);

    public CompletionStage<Void> thenRun(Runnable action);

    public CompletionStage<Void> thenRunAsync(Runnable action);

    public CompletionStage<Void> thenRunAsync(Runnable action,
                                              Executor executor);

    public <U,V> CompletionStage<V> thenCombine
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);

    public <U,V> CompletionStage<V> thenCombineAsync
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);

    public <U,V> CompletionStage<V> thenCombineAsync
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn,
         Executor executor);

    public <U> CompletionStage<Void> thenAcceptBoth
        (CompletionStage<? extends U> other,
         BiConsumer<? super T, ? super U> action);

    public <U> CompletionStage<Void> thenAcceptBothAsync
        (CompletionStage<? extends U> other,
         BiConsumer<? super T, ? super U> action);

    public <U> CompletionStage<Void> thenAcceptBothAsync
        (CompletionStage<? extends U> other,
         BiConsumer<? super T, ? super U> action,
         Executor executor);

    public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
                                              Runnable action);

    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
                                                   Runnable action);

    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
                                                   Runnable action,
                                                   Executor executor);

    public <U> CompletionStage<U> applyToEither
        (CompletionStage<? extends T> other,
         Function<? super T, U> fn);

    public <U> CompletionStage<U> applyToEitherAsync
        (CompletionStage<? extends T> other,
         Function<? super T, U> fn);

    public <U> CompletionStage<U> applyToEitherAsync
        (CompletionStage<? extends T> other,
         Function<? super T, U> fn,
         Executor executor);

    public CompletionStage<Void> acceptEither
        (CompletionStage<? extends T> other,
         Consumer<? super T> action);

    public CompletionStage<Void> acceptEitherAsync
        (CompletionStage<? extends T> other,
         Consumer<? super T> action);

    public CompletionStage<Void> acceptEitherAsync
        (CompletionStage<? extends T> other,
         Consumer<? super T> action,
         Executor executor);

    public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
                                                Runnable action);

    public CompletionStage<Void> runAfterEitherAsync
        (CompletionStage<?> other,
         Runnable action);

    public CompletionStage<Void> runAfterEitherAsync
        (CompletionStage<?> other,
         Runnable action,
         Executor executor);

    public <U> CompletionStage<U> thenCompose
        (Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync
        (Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync
        (Function<? super T, ? extends CompletionStage<U>> fn,
         Executor executor);

    public <U> CompletionStage<U> handle
        (BiFunction<? super T, Throwable, ? extends U> fn);

    public <U> CompletionStage<U> handleAsync
        (BiFunction<? super T, Throwable, ? extends U> fn);

    public CompletionStage<T> whenComplete
        (BiConsumer<? super T, ? super Throwable> action);

    public CompletionStage<T> whenCompleteAsync
        (BiConsumer<? super T, ? super Throwable> action);

    public CompletionStage<T> whenCompleteAsync
        (BiConsumer<? super T, ? super Throwable> action,
         Executor executor);

    public CompletionStage<T> exceptionally
        (Function<Throwable, ? extends T> fn);

    public CompletableFuture<T> toCompletableFuture();

}

 

 

아래 예제코드에 사용되는 Helper.java

package com.example04.completablefuture.completionstage;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@Slf4j
public class p099_Helper {
    @SneakyThrows
    public static CompletionStage<Integer> finishedStage() {
        var future = CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
        Thread.sleep(100);
        return future;
    }

    public static CompletionStage<Integer> completionStage() {
        return CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
    }

    public static CompletionStage<Integer> runningStage() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                log.info("I'm running!");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 1;
        });
    }

    public static CompletionStage<Integer> completionStageAfter1s() {
        return CompletableFuture.supplyAsync(() -> {
            log.info("getCompletionStage");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 1;
        });
    }

    public static CompletionStage<Integer> addOne(int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value + 1;
        });
    }

    public static CompletionStage<String> addResultPrefix(int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "result: " + value;
        });
    }

    public static CompletableFuture<Integer> waitAndReturn(int millis, int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                log.info("waitAndReturn: {}ms", millis);
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value;
        });
    }
}

//public interface Future<V> {
//    boolean cancel(boolean mayInterruptIfRunning);
//    boolean isCancelled();
//    boolean isDone();
//    V get() throws InterruptedException, ExecutionException;
//    V get(long timeout, TimeUnit unit)
//            throws InterruptedException, ExecutionException, TimeoutException;
//}

 

 

CompletionStage 연산자 조합

@Slf4j
public class p092_CompletionStageExample {
    public static void main(String[] args) throws InterruptedException {
        // chaining
        p099_Helper.completionStage()
                .thenApplyAsync(value -> {
                    log.info("thenApplyAsync: {}", value);
                    return value + 1;
                }).thenAccept(value -> {
                    log.info("thenAccept: {}", value);
                }).thenRunAsync(() -> {
                    log.info("thenRun");
                }).exceptionally(e -> {
                    log.info("exceptionally: {}", e.getMessage());
                    return null;
                });

        Thread.sleep(100);
    }
}
  • 50개에 가까운 연산자들을 활용하여 비동기 task들을 실행하고 값을 변형하는 등 chaining을 이용한 조합이 가능하다.
  • 에러를 처리하기 위한 콜백을 제공한다.

 

 

ForkJoinPool - thread pool

  • CompletableFuture는 내부적으로 비동기 함수들을 실행하기 위해 ForkJoinPool을 사용한다.
  • ForkJoinPool의 기본 size = 할당된 cpu 코어 - 1
  • 데몬 쓰레드임으로 main 쓰레드가 종료되면 즉각적으로 종료된다.

 

 

ForkJoinPool - fork & join

Task를 fork를 통해서 subtask로 나누고, Thread pool에서 steal work 알고리즘을 이용해서 균등하게 처리해서 join을 통해서 결과를 생성한다.

 

▶ Steal work 알고리즘

병렬 처리를 위해 작업을 잘게 나누어 각각을 처리하는 별개 쓰레드를 만든다면, 쓰레드를 만들고 제거하는 작업 자체가 큰 오버헤드일 수 있다. 

일정 개수의 쓰레드와 병렬처리를 위해 전체 작업 목록을 관리하는 작업 큐를 사용한다면, 이 작업 큐에서 task를 꺼내 병렬로 처리할때 작업 큐에 접근하는것 자체가 경쟁이므로 성능 저하가 발생할 수 있다.

일정한 개수의 쓰레드를 유지하고, 쓰레드마다 독립적인 작업큐를 관리하여, 하나의 쓰레드 큐가 비게되면 다른 쓰레드에서 task를 훔쳐올 수 있게하여 효율적으로 동작할 수 있다.

 

작업을 잘게 나눌 수 있을 때까지 split하고, 작업 큐에 있는 tail task를 다른 쓰레드가 나누어 병렬처리 한 후, join하여 합산하는 방식이다.

 

 

 

CompletionStage 연산자와 연결

 

 

 

thenAccept(), thenAcceptAsync()

public CompletionStage<Void> thenAccept(Consumer<? super T> action);

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
                                             Executor executor);
  • Consumer를 파라미터로 받는다.
  • 이전 task로부터 값을 받지만 값을 넘기지 않는다.
  • 다음 task에게 null이 전달된다.
  • 값을 받아서 action만 수행하는 경우 유용하다.

 

 

done 상태의 thenAccept()

done 상태에서 thenAccept는 caller(main)의 쓰레드에서 실행한다.

@Slf4j
public class p100_CompletionStageThenAcceptExample {
    public static void main(String[] args)
            throws InterruptedException {
        log.info("start main");

        // done 상태일때 thenAccept를 호출한 caller 쓰레드에서 action 실행
        CompletionStage<Integer> stage = p099_Helper.finishedStage();
        stage.thenAccept(i -> {
            log.info("{} in thenAccept", i);
        }).thenAccept(i -> {
            log.info("{} in thenAccept2", i);
        });
        log.info("after thenAccept");

        Thread.sleep(100);
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    @SneakyThrows
    public static CompletionStage<Integer> finishedStage() {
        var future = CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
        Thread.sleep(100);
        return future;
    }
    
    ...
}

 

실행결과
10:23:58.064 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - start main
10:23:58.069 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
10:23:58.206 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - 1 in thenAccept
10:23:58.207 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - null in thenAccept2
10:23:58.207 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptExample - after thenAccept

 

 

done 상태의 thenAcceptAsync()

done 상태의 thenAcceptAsync()를 사용하면 별도의 스레드에서 실행됨을 확인할 수 있다.

@Slf4j
public class p100_CompletionStageThenAcceptAsyncExample {
    public static void main(String[] args)
            throws InterruptedException {
        log.info("start main");

        // done 상태일때 thread pool에 있는 쓰레드에서 action 실행
        CompletionStage<Integer> stage = p099_Helper.finishedStage();
        stage.thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync", i);
        }).thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync2", i);
        });
        log.info("after thenAccept");

        Thread.sleep(100);
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    @SneakyThrows
    public static CompletionStage<Integer> finishedStage() {
        var future = CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
        Thread.sleep(100);
        return future;
    }
    
    ...
}

 

실행결과
10:26:33.829 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - start main
10:26:33.834 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
10:26:33.996 [main] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - after thenAccept
10:26:33.997 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - 1 in thenAcceptAsync
10:26:34.000 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p100_CompletionStageThenAcceptAsyncExample - null in thenAcceptAsync2

 

 

thenAccept[Async]의 실행쓰레드

  • done 상태에서 thenAccept는 caller(main)의 쓰레드에서 실행한다.
  • done 상태의 completionStage에 thenAccept를 사용하는 경우, caller 쓰레드를 block 할 수 있다.

 

 

running 상태의 thenAccept()

done 상태가 아닌 thenAccept는 callee(forkJoinPool)의 쓰레드에서 실행한다.

@Slf4j
public class p102_CompletionStageThenAcceptRunningExample {
    public static void main(String[] args)
            throws InterruptedException {
        log.info("start main");

        // thenAccept가 호출된 callee 쓰레드에서 action 실행
        CompletionStage<Integer> stage = p099_Helper.runningStage();
        stage.thenAccept(i -> {
            log.info("{} in thenAccept", i);
        }).thenAccept(i -> {
            log.info("{} in thenAccept2", i);
        });

        Thread.sleep(2000);
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    ...

    public static CompletionStage<Integer> runningStage() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                log.info("I'm running!");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 1;
        });
    }
    
    ...
}

 

실행결과
10:50:22.587 [main] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptRunningExample - start main
10:50:23.637 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - I'm running!
10:50:23.638 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptRunningExample - 1 in thenAccept
10:50:23.640 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptRunningExample - null in thenAccept2

 

running 상태의 thenAcceptAsync()

@Slf4j
public class p102_CompletionStageThenAcceptAsyncRunningExample {
    public static void main(String[] args)
            throws InterruptedException {
        log.info("start main");

        // thread pool에 있는 쓰레드에서 action 실행
        CompletionStage<Integer> stage = p099_Helper.runningStage();
        stage.thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync", i);
        }).thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync", i);
        });

        Thread.sleep(2000);
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    ...

    public static CompletionStage<Integer> runningStage() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                log.info("I'm running!");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 1;
        });
    }
    
    ...
}

 

실행결과
10:49:23.168 [main] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptAsyncRunningExample - start main
10:49:24.199 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - I'm running!
10:49:24.202 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptAsyncRunningExample - 1 in thenAcceptAsync
10:49:24.204 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p102_CompletionStageThenAcceptAsyncRunningExample - null in thenAcceptAsync

 

 

thenAccept[Async]의 실행쓰레드

  • done 상태가 아닌 thenAccept는 callee(forkJoinPool)의 쓰레드에서 실행한다.
  • done 상태가 아닌 completionStage에 thenAccept를 사용하는 경우, callee를 block 할 수 있다.

 

 

then*[Async]의 실행 스레드

 

 

then*Async의 쓰레드풀 변경

  • 모든 then*Async 연산자는 executor를 추가 인자로 받는다.
  • 이를 통해서 다른 쓰레드풀로 task를 실행할 수 있다.
@Slf4j
public class p105_CompletionStageThenAcceptAsyncExecutorExample {
    public static void main(String[] args)
            throws InterruptedException {

        var single = Executors.newSingleThreadExecutor();
        var fixed = Executors.newFixedThreadPool(10);

        log.info("start main");
        CompletionStage<Integer> stage = p099_Helper.completionStage();
        stage.thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync", i);
        }, fixed).thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync2", i);
        }, single);
        log.info("after thenAccept");
        Thread.sleep(200);

        single.shutdown();
        fixed.shutdown();
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    ...

    public static CompletionStage<Integer> completionStage() {
        return CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
    }
    
    ...
    
}

 

실행결과
12:44:13.435 [main] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - start main
12:44:13.440 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
12:44:13.441 [main] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - after thenAccept
12:44:13.440 [pool-2-thread-1] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - 1 in thenAcceptAsync
12:44:13.442 [pool-1-thread-1] INFO com.example04.completablefuture.completionstage.p105_CompletionStageThenAcceptAsyncExecutorExample - null in thenAcceptAsync2

 

 

thenApply[Async]

  • Function을 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 값을 반환한다.
  • 다음 task에게 반환했던 값이 전달된다.
  • 값을 변형해서 전달해야 하는 경우 유용하다.
@Slf4j
public class p107_CompletionStageThenApplyAsyncExample {
    public static void main(String[] args)
            throws InterruptedException {
        CompletionStage<Integer> stage = p099_Helper.completionStage();
        stage.thenApplyAsync(value -> {
            var next = value + 1;
            log.info("in thenApplyAsync: {}", next);
            return next; // add 1
        }).thenApplyAsync(value -> {
            var next = "result: " + value;
            log.info("in thenApplyAsync2: {}", next);
            return next;
        }).thenApplyAsync(value -> {
            var next = value.equals("result: 2");
            log.info("in thenApplyAsync3: {}", next);
            return next;
        }).thenAcceptAsync(value -> log.info("{}", value));

        Thread.sleep(100);
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    ...

    public static CompletionStage<Integer> completionStage() {
        return CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
    }
    
    ...
    
}

 

실행결과
13:35:28.079 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:35:28.082 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - in thenApplyAsync: 2
13:35:28.102 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - in thenApplyAsync2: result: 2
13:35:28.103 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - in thenApplyAsync3: true
13:35:28.103 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p107_CompletionStageThenApplyAsyncExample - true

 

 

thenCompose[Async]

  • Function을 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 CompletionStage를 반환한다.
  • 반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다.
  • 다른 future를 반환해야하는 경우 유용하다.
@Slf4j
public class p109_CompletionStageThenComposeAsyncExample {
    public static void main(String[] args)
            throws InterruptedException {
        log.info("start main");
        CompletionStage<Integer> stage = p099_Helper.completionStage();
        stage.thenComposeAsync(value -> {
            var next = p099_Helper.addOne(value);
            log.info("in thenComposeAsync: {}", next);
            return next;
        }).thenComposeAsync(value -> {
            var next = p099_Helper.addResultPrefix(value);
            log.info("in thenComposeAsync2: {}", next);
            return next;
        }).thenAcceptAsync(value -> {
            log.info("{} in thenAcceptAsync", value);
        });

        Thread.sleep(1000);
        log.info("end main");
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    ...

    public static CompletionStage<Integer> completionStage() {
        return CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
    }
    
    public static CompletionStage<Integer> addOne(int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value + 1;
        });
    }

    public static CompletionStage<String> addResultPrefix(int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "result: " + value;
        });
    }
    
    ...
    
}

 

실행결과
13:44:24.224 [main] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - start main
13:44:24.229 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:44:24.231 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - in thenComposeAsync: java.util.concurrent.CompletableFuture@1a026754[Not completed]
13:44:24.362 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - in thenComposeAsync2: java.util.concurrent.CompletableFuture@63ed7ac7[Not completed]
13:44:24.502 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - result: 2 in thenAcceptAsync
13:44:25.306 [main] INFO com.example04.completablefuture.completionstage.p109_CompletionStageThenComposeAsyncExample - end main

 

 

thenRun[Async]

  • Runnable을 파라미터로 받는다.
  • 이전 task로부터 값을 받지 않고 값을 반환하지 않는다.
  • 다음 task에게 null이 전달된다.
  • future가 완료되었다는 이벤트를 기록할 때 유용하다.
@Slf4j
public class p110_CompletionStageThenRunAsyncExample {
    public static void main(String[] args)
            throws InterruptedException {
        log.info("start main");
        CompletionStage<Integer> stage = p099_Helper.completionStage();
        stage.thenRunAsync(() -> {
            log.info("in thenRunAsync");
        }).thenRunAsync(() -> {
            log.info("in thenRunAsync2");
        }).thenAcceptAsync(value -> {
            log.info("{} in thenAcceptAsync", value);
        });

        Thread.sleep(100);
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    ...

    public static CompletionStage<Integer> completionStage() {
        return CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
    }
    
    ...
    
}

 

실행결과
13:48:04.847 [main] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - start main
13:48:04.852 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:48:04.853 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - in thenRunAsync
13:48:04.853 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - in thenRunAsync2
13:48:04.853 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p110_CompletionStageThenRunAsyncExample - null in thenAcceptAsync

 

 

exceptionally

  • Function을 파라미터로 받는다.
  • 이전 task에서 발생한 exception을 받아서 처리하고 값을 반환한다.
  • 다음 task에게 반환된 값을 전달한다.
  • future 파이프에서 발생한 에러를 처리할때 유용하다.
@Slf4j
public class p113_CompletionStageExceptionallyExample {
    public static void main(String[] args)
            throws InterruptedException {
        p099_Helper.completionStage()
                .thenApplyAsync(i -> {
                    log.info("in thenApplyAsync");
                    return i / 0;
                }).exceptionally(e -> {
                    log.info("{} in exceptionally", e.getMessage());
                    return 0;
                }).thenAcceptAsync(value -> {
                    log.info("{} in thenAcceptAsync", value);
                });

        Thread.sleep(1000);
    }
}

// p099_Helper.java
@Slf4j
public class p099_Helper {
    ...

    public static CompletionStage<Integer> completionStage() {
        return CompletableFuture.supplyAsync(() -> {
            log.info("return in future");
            return 1;
        });
    }
    
    ...
    
}

 

실행결과
13:54:11.069 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - return in future
13:54:11.074 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p113_CompletionStageExceptionallyExample - in thenApplyAsync
13:54:11.074 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p113_CompletionStageExceptionallyExample - java.lang.ArithmeticException: / by zero in exceptionally
13:54:11.075 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p113_CompletionStageExceptionallyExample - 0 in thenAcceptAsync

 

 

CompletableFuture 클래스

위에서 본 CompletionStage 인터페이스를 구현한 CompletableFuture 클래스다. CompletionStage는 계산이 완료되면 결과를 설정할 수 있는 메서드는 존재하지 않았는데, CompletableFuture 클래스는 complete(), completeExceptionally() 등의 메서드를 사용하여 계산의 결과나 예외를 수동으로 설정할 수 있다. CompletableFuture 클래스는 완료된 결과의 다양한 조작을 수행할 수 있는 다양한 메서드를 제공하고있다.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    ...
}

 

 

supplyAsync()

  • Supplier를 제공하여 CompletableFuture를 생성 가능하다.
  • Supplier의 반환값이 CompletableFuture의 결과다.
@Slf4j
public class p118_CompletableFutureSupplyAsyncExample {
    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        log.info("start main");
        var future = CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 1;
        });
        assert !future.isDone();

        Thread.sleep(1000);

        assert future.isDone();
        assert future.get() == 1;

        log.info("end main");
    }
}

 

실행결과
14:16:34.800 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureSupplyAsyncExample - start main
14:16:34.805 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureSupplyAsyncExample - supplyAsync
14:16:35.877 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureSupplyAsyncExample - end main

 

 

runAsync()

  • Runnable를 제공하여 CompletableFuture 를 생성할 수 있다.
  • 값을 반환하지 않는다.
  • 다음 task에 null이 전달된다.
@Slf4j
public class p118_CompletableFutureRunAsyncExample {
    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        log.info("start main");
        var future = CompletableFuture.runAsync(() -> {
            log.info("runAsync");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        assert !future.isDone();

        Thread.sleep(1000);
        assert future.isDone();
        assert future.get() == null;

        log.info("end main");
    }
}

 

실행결과
14:16:02.678 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureRunAsyncExample - start main
14:16:02.683 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureRunAsyncExample - runAsync
14:16:03.759 [main] INFO com.example04.completablefuture.completablefuture.p118_CompletableFutureRunAsyncExample - end main

 

 

complete()

CompletableFuture가 완료되지 않았다면 주어진 값으로 채운다.

complete에 의해서 상태가 바뀌었다면 true, 아니라면 false를 반환한다.

@Slf4j
public class p120_CompletableFutureCompleteExample {
    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        assert !future.isDone();

        // 완료되지 않았다면 주어진 값으로 채운다.
        var triggered = future.complete(1);
        assert future.isDone();
        assert triggered;
        assert future.get() == 1;

        triggered = future.complete(2);
        assert future.isDone();
        assert !triggered;
        assert future.get() == 1;
    }
}

 

 

CompletableFuture 상태

 

 

isCompletedExceptionally()

  • Exception에 의해서 complete 되었는지 확인 할 수 있다.
@Slf4j
public class p121_CompletableFutureIsCompletedExceptionallyExample {
    public static void main(String[] args)
            throws InterruptedException {
        var futureWithException = CompletableFuture.supplyAsync(() -> {
            return 1 / 0;
        });
        Thread.sleep(100);
        assert futureWithException.isDone();
        // exception에 의해서 complete 되었는가?
        assert futureWithException.isCompletedExceptionally();
    }
}

 

 

allOf()

  • 여러 completableFuture를 모아서 하나의 completableFuture로 변환할 수 있다.
  • 모든 completableFuture가 완료되면 상태가 done으로 변경한다.
  • Void를 반환하므로 각각의 값에 get으로 접근해야 한다.
@Slf4j
public class p123_CompletableFutureAllOfExample {
    public static void main(String[] args)
            throws InterruptedException {
        var startTime = System.currentTimeMillis();
        var firstFuture = p099_Helper.waitAndReturn(100, 1);
        var secondFuture = p099_Helper.waitAndReturn(500, 2);
        var thirdFuture = p099_Helper.waitAndReturn(1000, 3);

        CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    log.info("after allOf");
                    try {
                        log.info("first: {}", firstFuture.get());
                        log.info("second: {}", secondFuture.get());
                        log.info("third: {}", thirdFuture.get());
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }).join();

        var endTime = System.currentTimeMillis();
        log.info("elapsed: {}ms", endTime - startTime);
    }
}

 

실행결과
14:22:31.615 [ForkJoinPool.commonPool-worker-9] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 1000ms
14:22:31.615 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 100ms
14:22:31.615 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 500ms
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - after allOf
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - first: 1
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - second: 2
14:22:32.623 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - third: 3
14:22:32.627 [main] INFO com.example04.completablefuture.completablefuture.p123_CompletableFutureAllOfExample - elapsed: 1019ms

 

 

anyOf()

  • 여러 completableFuture를 모아서 하나의 completableFuture로 변환할 수 있다.
  • 주어진 future 중 하나라도 완료되면 상태가 done으로 변경한다.
  • 제일 먼저 done 상태가 되는 future의 값을 반환한다.
@Slf4j
public class p125_CompletableFutureAnyOfExample {
    public static void main(String[] args)
            throws InterruptedException {
        var startTime = System.currentTimeMillis();
        var firstFuture = p099_Helper.waitAndReturn(100, 1);
        var secondFuture = p099_Helper.waitAndReturn(500, 2);
        var thirdFuture = p099_Helper.waitAndReturn(1000, 3);

        CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    log.info("after anyOf");
                    log.info("first value: {}", v);
                }).join();

        var endTime = System.currentTimeMillis();
        log.info("elapsed: {}ms", endTime - startTime);
    }
}

 

실행결과
14:23:18.467 [ForkJoinPool.commonPool-worker-5] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 500ms
14:23:18.467 [ForkJoinPool.commonPool-worker-19] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 100ms
14:23:18.467 [ForkJoinPool.commonPool-worker-23] INFO com.example04.completablefuture.completionstage.p099_Helper - waitAndReturn: 1000ms
14:23:18.644 [ForkJoinPool.commonPool-worker-9] INFO com.example04.completablefuture.completablefuture.p125_CompletableFutureAnyOfExample - after anyOf
14:23:18.644 [ForkJoinPool.commonPool-worker-9] INFO com.example04.completablefuture.completablefuture.p125_CompletableFutureAnyOfExample - first value: 1
14:23:18.644 [main] INFO com.example04.completablefuture.completablefuture.p125_CompletableFutureAnyOfExample - elapsed: 184ms

 

 

CompletableFuture의 한계

  • 지연 로딩 기능을 제공하지 않는다.
    • CompletableFuture를 반환하는 함수를 호출시 즉시 작업이 실행된다.
  • 지속적으로 생성되는 데이터를 처리하기 어렵다.
    • CompletableFuture에서 데이터를 반환하고 나면 다시 다른 값을 전달하기 어렵다.

 

 

 

 

 

 

Refernece

강의; Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지

https://medium.com/@kimkare/java-work-stealing-fork-join-f7c18e8ffa1a

 

 

 

반응형

'Coding > Java' 카테고리의 다른 글

[Java] 람다 INVOKEDYNAMIC의 내부 동작에 대한 이해  (0) 2023.04.18
플라이웨이트 패턴 (Flyweight Pattern)  (0) 2023.04.02
wait()과 notify(), notifyAll()  (0) 2022.08.17
[JAVA] ThreadLocal  (0) 2022.08.04
[JAVA] Volatile 변수  (0) 2022.08.03

Designed by JB FACTORY