[JAVA8 병렬프로그래밍] Future와 CompletableFuture

반응형
728x90
반응형

Future

포크/조인 프레임워크에서 살펴본 RecursiveTask와 RecursiveAction의 명세서를 살펴보면 두 클래스 모두 Future 인터페이스를 구현한 추상 클래스이며 Future 인터페이스는 자바 5에서 공개한 컨커런트 API에 포함되어있다.

Future 인터페이스는 비동기 연산의 결과를 표현한다. 해당 인터페이스에서 제공하는 메서드 목록을 보면 연산 작업이 완료되었는지 확인하고, 완료될 때까지 대기하고, 모든 연산이 완료된 후의 결과를 조회하는 기능을 제공한다.  

 

제공 메서드

메서드 설명
cancel() 현재 task의 중단을 시도한다.
isDone(), isCancelled() 비동기 연산이 종료 혹은 취소되었는지 확인한다.
get() 결괏값을 응답받을 때까지 대기한다.

 

 

예제코드

FutureExample.java
package org.example.future;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class FutureExample {
    // 제곱을 계산하는 Callable 객체를 생성한다.
    public Callable<Long> calSquare(long value) {
        Callable<Long> callable = new Callable<Long>() {
            @Override
            public Long call() throws Exception {
                Long returnValue = value * value;
                TimeUnit.SECONDS.sleep(1);

                System.out.println(value + "의 제곱근은 " + returnValue);

                return returnValue;
            }
        };

        return callable;
    }

    public void executeTest() {
        List<Long> sampleDataList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
        List<Future<Long>> futureList = new ArrayList<>();

        // 쓰레드 풀을 생성한다. (고정 스레드 풀 이용)
        ExecutorService servicePool = Executors.newFixedThreadPool(4);

        // Callable 객체를 생성한 후 쓰레드 풀에 등록한다. 등록된 쓰레드에 대해 Future 객체를 리턴 받는다.
        for(Long sampleValue : sampleDataList) {
            /*
               1) Runnable : 메서드가 void형인 run 메서드만 있다.
               2) Callable : 제네릭으로 정의한 리턴 타입을 가지는 call 메서드가 제공된다.
               (비동기로 데이터를 처리한 이후에 그 결과를 리턴할 필요가 있다면 Callable을 이용해야한다.)
             */
            Future<Long> future = servicePool.submit(calSquare(sampleValue));
            futureList.add(future);
        }

        Long sumValue = 0L;

        // Future 목록의 결과를 확인한다.
        for(Future<Long> future : futureList) {
            try {
                // 결과를 읽어들일 때가지 대기한다.
                // 대기 하는 동안, 쓰레드가 계산을 하고 값을 리턴한다.
                sumValue += future.get();
            }
            catch(ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("최종 합계 : " + sumValue);

        servicePool.shutdown();
    }

    public static void main(String[] args) {
        FutureExample futureExample = new FutureExample();
        futureExample.executeTest();
    }
}
1) Callable 인터페이스의 구현체를 실행시키기 위한 스레드 풀을 정의하기 위해 ExecutorService 객체를 생성한다.
2) 스레드 풀인 ExecutorService에 Callable 구현체를 등록하고 Future를 리턴받는다.
3) 리턴받은 Future는 향후에 값을 확인하기 위해 List와 같은 컬렉션에 등록한다.
4) Future로 연산의 결과를 확인하기 위해 get 메서드를 호출한다. get 메서드는 비동기 연산이 종료될때까지 대기한다.

 

Future 인터페이스는 비동기 작업의 실행 종료 여부를 확인할 수 있고, 그 실행 결과를 리턴받을 수도 있으며 실행시킨 모든 비동기 연산이 끝날 때까지 대기시킬 수도 있다.

 

 

 

CompletableFuture

자바8에 추가된 기능이다. Future 인터페이스의 구현체 중 하나다. Future 인터페이스의 기능 외에 비동기 연산 간의 관계를 정의하거나 연관 결과를 수집, 조합하는 등의 작업이 추가로 필요하여 CompletableFuture가 추가되었다. 

 

장점

1) 스레드의 선언 없이도 비동기 연산 작업을 구현할 수 있고 병렬 프로그래밍이 가능하다.

2) 람다 표현식과 함수형 프로그래밍을 사용할 수 있어서 코드의 양을 현저히 줄일 수 있다.

3) 파이프라인 형태로 작업들을 연결할 수 있어서, 비동기 작업의 순서를 정의하고 관리할 수 있다.

 

메서드

메서드 설명
runAsync Runnable 구현체를 이용해서 비동기 연산 작업을 하기 위한 새로운 CompletableFuture 객체를 리턴한다.
supplyAsync Supplier 함수형 인터페이스의 구현체를 이용해서 비동기 연산 작업을 위한 새로운 CompletableFuture 객체를 리턴한다.
* Supplier 인터페이스는 자바에서 기본 제공하는 함수형 인터페이스이며, 입력 파라미터는 없고 리턴 값만 있다. 
thenAccept 현재 단계가 성공적으로 종료되었을 경우, 메서드의 파라미터로 전달된 Consumer 함수형 인터페이스의 구현체를 실행하기 위한 CompletionStage 객체를 리턴한다.
thenRun 현재 단계가 성공적으로 종료되었을 경우, 메서드의 파라미터로 전달된 Runnable 구현체를 실행하기 위한 ComletionStage 객체를 리턴한다.
complete 현재 태스크를 종료하며 만일 태스크가 동작 중이라면 get 메서드와 동일하게 종료될때까지 대기하고, 최종 태스크 결과를 리턴한다.

CompletableFuture 클래스가 Future 인터페이스의 구현체이기 때문에 Future에서 제공하는 get, isDone, isCancelled 메서드도 함께 제공한다. 

 

 

 

예제코드

CompletabeFuture.java
package org.example.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureExample {

    public static void main(String[] args) {
        // 첫번째 Runnable 인터페이스 정의 
        Runnable mainTask = () -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e) { }

            System.out.println("Main Task : " + Thread.currentThread().getName());
        };

        // 두번째 Runnable 인터페이스 정의 
        Runnable subTask =
                () -> System.out.println("Next Task : " + Thread.currentThread().getName());

        // ExecutorService 정의 
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // 두개의 Runnable 작업을 등록하고 실행시킨다.
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
    }
}

1) thenRun()

CompletableFuture 객체에 태스크를 연결할 경우 사용하며, 여러번 반복해서 원하는 만큼 연결 작업을 할 수 있다. 

리턴 값은 CompletableFuture이며 파라미터로 Runnable 구현체를 받는다.

 

2) Executors 클래스 사용

CompletableFuture 클래스도 컨커런트 API의 일부이기 때문에 실행하기 위해서는 스레드 풀을 생성해야한다. 

 

메서드 설명
runAsync Runnable 인터페이스 구현체를 실행시킨다. run 메서드가 void 타입이기 때문에 값을 외부에 리턴할 수 없다.
supplyAsync Supplier 인터페이스 구현체를 실행시킨다.
* Supplier 인터페이스는 자바에서 기본 제공하는 함수형 인터페이스이며, 입력 파라미터는 없고 리턴 값만 있다. 
리턴 객체를 받아서 결과를 확인할 수 있다.

리턴받은 CompletableFuture 객체를 이용해서 새로운 태스크를 등록하고 이전 태스크와 연결할 수 있다. 위 두 메서드는 static 메서드이다. 객체를 생성하지 않아도 호출이 가능하며, 그 결과로 새로운 CompletableFuture 객체를 리턴한다.

 

 

 

비동기 방식 구현 예제

InsuranceCalculator.java
package org.example.future;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class InsuranceCalculator {
    public int calculatePrice(Map condition) {
        // 기본 가격
        int price = 10000;

        // 보험료 계산하는 로직 대신 10초 대기하는 것으로 대체한다.
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch(Exception e) {}

        // 임의의 가격을 리턴한다.
        return price;
    }

    public Future<Integer> calculatePriceAsync(Map condition) {
        CompletableFuture<Integer> future = new CompletableFuture<>();

        // 비동기 처리를 위해 new Thread를 이용해서 스레드를 생성했다.
        // 쓰레드를 생성하고 실행할 작업을 CompletableFuture에 등록한다.
        new Thread(() -> {
            int price = calculatePrice(condition);

            // 해당 스레드에 대한 참조를 얻기 위해서 CompletableFuture 클래스의 complete 메서드를 이용하여 작업을 등록한다.
            future.complete(price);
        }).start();

        return future;
    }


    public static void main(String[] args) {
        InsuranceCalculator cal = new InsuranceCalculator();

        // 동기 방식 처리
        System.out.println("동기 방식 호출");

        for (int i = 0 ; i < 5 ; i++) {
            System.out.printf("계산 결과 : %s\n", cal.calculatePrice(null));
        }

        /**
         * calculatePrice 메서드는 그대로 두고, 이를 호출하는 로직을 비동기로 전환한다.
         */
        // 비동기 방식 처리 1
        System.out.println("비동기 방식 호출 1");

        List<Future<Integer>> futureList = new ArrayList<>();
        ExecutorService service = Executors.newFixedThreadPool(5);

        for(int i = 0 ; i < 5 ; i++) {
            // 비동기 처리
            Future<Integer> future = service.submit(() -> {
                return new InsuranceCalculator().calculatePrice(null);
            });

            futureList.add(future);
        }

        futureList.forEach((future) -> {
            try {
                // 계산 결과 출력
                System.out.printf("계산 결과 : %s\n", future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

        /**
         * 비동기 구현 calculatePriceAsync 메서드 호출로 변경한다.
         */
        // 비동기 처리 방식 2
        System.out.println("비동기 방식 호출 2");

        futureList.clear();

        for(int i = 0 ; i < 5 ; i++) {
            Future<Integer> future = cal.calculatePriceAsync(null);
            futureList.add(future);
        }

        futureList.forEach((future) -> {
            try {
                // get()을 사용해서 결과를 받는다. 
                System.out.printf("계산 결과 : %s\n", future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

 

 

 

반응형

Designed by JB FACTORY