[Spring Reactive Programming] 4. Reactive Streams - 자바와 스프링의 비동기 개발 기술 (Runnable, Callable, Future, FutureTask, DeferredResult, Emitter)

반응형
728x90
반응형

비동기 구현 예제 1) Runnable

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
매개변수, 리턴값이 없는 로직
ExecutorService es = Executors.newCachedThreadPool();
        /** 별도의 스레드로 실행해보자. */
        es.execute(() -> { // 매개변수, 리턴값이 없는 Runnable 구현
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            log.info("Async");
//            return "Hello"; // Runnable 은 리턴이 없다.
        });
        log.info("Exit");
결과
[main] INFO com.reactive.step04.E16_FutureEx - Exit
[pool-1-thread-1] INFO com.reactive.step04.E16_FutureEx - Async

 

 

비동기 구현 예제 2) Callable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
매개변수는 없고 리턴값이 존재하는 경우
/** 리턴이 필요한 경우 : 별도의 스레드로 실행해보자. */
es.submit(new Callable<String>() {
    @Override
    public String call() throws Exception { // Callable 구현
        Thread.sleep(2000);
        log.info("Async");
        return "Hello";
    }
});
log.info("Exit");
결과
[main] INFO com.reactive.step04.E16_FutureEx - Exit
[pool-1-thread-1] INFO com.reactive.step04.E16_FutureEx - Async

 

 

리턴값을 main 스레드로 가져오고싶은 경우 - Future

/** 리턴 결과를 main 스레드로 가져오고싶다.*/
/** 리턴이 필요한 경우 : 별도의 스레드로 실행해보자. */
Future<String> f = es.submit(new Callable<String>() {
    @Override
    public String call() throws Exception { // Callable 구현
        Thread.sleep(2000);
        log.info("Async");
        return "Hello";
    }
});

System.out.println(f.isDone()); // false (수행 시점에 작업 완료 여부 출력)
Thread.sleep(2000);
log.info("Start"); // 제일 먼저 수행되겠다.
System.out.println(f.isDone()); // true (true 일 경우 get()으로 값을 가져오는 로직을 사용할 수도 있다.)
        
log.info(f.get());
log.info("Exit"); // get() 후인 "Exit"은 제일 마지막에 수행되겠다.

 

수행 결과
false
[main] INFO com.reactive.step04.E16_FutureEx - Start
false
[pool-1-thread-1] INFO com.reactive.step04.E16_FutureEx - Async
[main] INFO com.reactive.step04.E16_FutureEx - Hello
[main] INFO com.reactive.step04.E16_FutureEx - Exit

 

 

FutureTask 사용하기

FutureTask.java
public class FutureTask<V> implements RunnableFuture<V> {
    ...
    
    protected void done() { } // 작업이 완료되면 호출되는 메서드 구현
    
    ...
}

 

SuccessCallback.java
/**
 * 성공 콜백
 */
interface SuccessCallback {
    void onSuccess(String result);
}

 

ErrorCallback.jaava
/**
 * 에러 콜백
 */
interface ExceptionCallback {
    void onError(Throwable t);
}

 

CallbackFutureTask.java
public static class CallbackFutureTask extends FutureTask<String> {
    SuccessCallback sc;
    ExceptionCallback ec;

    public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
        super(callable);

        this.sc = Objects.requireNonNull(sc); // null 이 아니여야하고, null 이면 NullPointerException 발생
        this.ec = Objects.requireNonNull(ec);
    }

    @Override
    protected void done() {
        try {
            sc.onSuccess(get());
        } catch (InterruptedException e) { // 예외 발생시, main 스레드에 어떻게 던져줄까?
            // interrupt 발생 시그널을 주자.
            Thread.currentThread().interrupt();
            // 작업을 수행하지말고 종료하라는 signal을 주는 에러
        } catch (ExecutionException e) {
            ec.onError(e.getCause());
        }
    }
}

 

main 메서드
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    CallbackFutureTask f = new CallbackFutureTask(() -> {
        Thread.sleep(2000);

//        if (1 == 1) {
//            throw new RuntimeException("Aysnc ERROR!!!"); // 고의 에러 발생
//        }

        log.info("Async");
        return "Hello";
    }, new SuccessCallback() {
        @Override
        public void onSuccess(String result) {
            System.out.println("Result: " + result);
        }
    }, new ExceptionCallback() {
        @Override
        public void onError(Throwable t) {
            System.out.println("Error: " + t.getMessage());
        }
    });

    es.execute(f);
    es.shutdown();
}

 

결과
[pool-3-thread-1] INFO com.reactive.step04.E18_FutureTaskCallbackEx - Async
Result: Hello

 

 

Controller 테스트

@RestController
@RequiredArgsConstructor
@Slf4j
public class E19_MyController {
    @GetMapping("/async")
    public String async() throws InterruptedException {
        // 이 로직을 수행하는 동안 블로킹 되는걸 원치않는다. -> callable 로 변경
        Thread.sleep(2000);
        return "hello";
    }
}
http://localhost:8080/async 접속

2초 후 아래와 같이 hello가 찍힌다.

http://localhost:8080/async

 

 

블로킹 되지 않도록 Callable로 구현해보기

@RestController
@RequiredArgsConstructor
@Slf4j
public class E19_MyController {
    ...

    /**
     * http-nio-8080-exec-3 (tomcat 위에서 도는 쓰레드)
     * main에서 실행하는게 아니므로 main 쓰레드가 아니다.
     * @return
     * @throws InterruptedException
     */
    @GetMapping("/callable")
    public Callable<String> callable() throws InterruptedException {
        log.info("callable"); // http-nio-8080-exec-3

        return () -> {
            log.info("async"); // console 에는 이미 찍히고, (MvcAsync1 이라는 별도의 쓰레드로 실행 - 스프링)
            Thread.sleep(2000);
            return "hello"; // 결과를 클라이언트에 내려준다.
        };
        
//        return new Callable<String>() {
//            @Override
//            public String call() throws Exception {
//                log.info("async"); // console 에는 이미 찍히고, (MvcAsync1 이라는 별도의 쓰레드로 실행 - 스프링)
//                Thread.sleep(2000);
//                return "hello"; // 결과를 클라이언트에 내려준다.
//            }
//        };
    }
}
결과
[nio-8080-exec-8] com.reactive.step04.E19_MyController     : callable
[      MvcAsync2] com.reactive.step04.E19_MyController     : async

 

 

스레드 할당 과정 보기

어떤 웹 요청이 동시에 100개 요청이 오면 100개의 스레드 생성하여 할당되는 과정 보기
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 어떤 웹 요청이 동시에 100개 요청이 오면 100개의 스레드 생성하여 할당되는 과정 보기
 */
@Slf4j
public class E20_LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        // 호출 
    }

    private static void basic(String url) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(100);
//        ExecutorService es = Executors.newFixedThreadPool(20); (2)

        RestTemplate rt = new RestTemplate();

        StopWatch main = new StopWatch();
        main.start();

        for (int i = 0; i < 100; i++) {
            es.execute(() -> {
                int idx = counter.addAndGet(1);
                log.info("Thread: {}", idx);

                StopWatch sw = new StopWatch();
                sw.start();

                rt.getForObject(url, String.class);

                sw.stop();
                log.info("Elapsed: " + idx + " -> " + sw.getTotalTimeMillis());
            });
        }

        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS); // 대기작업이 끝날때까지 100초까지만 기다린다.
        main.stop();

        log.info("Total : {}", main.getTotalTimeSeconds());
    }
}

 

1) 스레드 100개가 생성되어 100개의 요청을 각 스레드가 수행한다.

/**
 * 100개가 동시에 2초정도 걸림
 * 각 요청별로 스레드가 생성되어 총 스레드 100개가 생성되었다.
*/
basic("http://localhost:8080/async");

...

ExecutorService es = Executors.newFixedThreadPool(100);
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 2.133658542

 

2) 스레드를 20개 생성하여, 100개의 요청을 수행한다. 2초짜리 작업 x 5회 = 10초정도 걸린다.

server.tomcat.max-threads=20 # default : 200
/**
 * application.properties 에 'server.tomcat.max-threads=20' 추가 후 수행해보자.
 * 10초 정도 걸린다.
 * 스레드는 1 ~ 20까지 총 20개만 생성되었다.
 * 100개의 요청이 갔지만 스레드는 20개까지만 생성 가능하므로 나머지 80개는 큐에서 대기중이였다.
 * 스레드가 풀리면 다음 20개, 그 다음 20개.. 이렇게 되면서 2초짜리 작업 x 5  = 10초 정도 걸린다.
 */
basic("http://localhost:8080/async");

...

ExecutorService es = Executors.newFixedThreadPool(20); (2)
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 10.132215125

 

3) 스레드를 20개 생성하고, 작업 스레드를 100개 만들어 사용한다.

/**
 * callable 호출해보자. (비동기)
 * 2초 걸렸다.
 * 스레드가 20개만 생성됨에도 위 동기 방식보다 훨씬 빠르다.
 *
 * 서블릿 스레드 / 작업 스레드
 * 서블릿 스레드는 20개로 돌려쓰지만, 2초짜리 작업인 작업 스레드는 100개를 만들어 사용한 것이다.
 */
basic("http://localhost:8080/callable");

...

ExecutorService es = Executors.newFixedThreadPool(100);
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 2.125754417

 

4) 서블릿 스레드 1개, 작업 스레드 100개로 100개의 요청을 처리한다.

server.tomcat.max-threads=1 # default : 200
/**
 * application.properties 에 'server.tomcat.max-threads=1' 로 변경 후 수행해보자.
 * 2.3 초 안에 100개 요청이 처리되었다.
 *
 * 서블릿 스레드 / 작업 스레드
 * 서블릿 스레드는 1개로 돌려쓰지만, 2초짜리 작업인 작업 스레드는 100개를 만들어 사용한 것이다.
 *
 * -> 스레드 풀의 개수가 100개 정도인데, 긴 작업을 수행하는게 20개정도 있을때
 * 긴 작업은 작업 스레드가 수행하게하고, 그러는 사이에 서블릿 스레드는 빠르게 처리해서 마칠 수 있는 일반적인 스레드의 처리 작업에
 * 다 할당을 시키면 서블릿 스레드의 활용도가 높다.
 *
 * (결론)
 * 서블릿 스레드는 빠르게 반환되어 1개의 스레드를 가지고도 많은 요청을 처리할 수는 있다.
 */
basic("http://localhost:8080/callable");

...

ExecutorService es = Executors.newFixedThreadPool(100);
결과
 [main] INFO com.reactive.step04.E20_LoadTest - Total : 2.220901875

 

 

DeferrdResultController.java 생성하여 테스트해보기

DeferrdResultController.java

 

package com.reactive.step04;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;

@RestController
@RequiredArgsConstructor
@Slf4j
public class E21_DeferredResultController {
    // keep
    Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();


    @GetMapping("/dr")
    public DeferredResult<String> dr() {
        log.info("dr");
        DeferredResult<String> dr = new DeferredResult<>();

        results.add(dr);

        return dr;
    }

    @GetMapping("/dr/count")
    public String drCount() {
        return String.valueOf(results.size());
    }

    @GetMapping("/dr/event")
    public String drEvent(String msg) {
        for (DeferredResult<String> dr : results) {
            dr.setResult("Hello " + msg);
            results.remove(dr);
        }

        return "OK";
    }
}
실행순서
  • 1. localhost:8080/dr 수행
    • 응답이 오지 않은 대기 상태다.
  • 2. localhost:8080/dr/count 수행
    • 위 1)번에서 수행된 add()로 총 1개다.
  • 3. localohost:8080/dr/event?msg=Result 수행
    • dr.setResult()가 호출되면서 1)번의 대기 상태가 끝나고 결과가 출력되면서 끝이난다.

위 과정은 작업 스레드를 별도로 생성하지 않는다. DeferredResut Object만 메모리에 유지가 되고, 위와 같은 방식으로 이벤트 발생시 결과를 내려준다. Servlet 자원은 최소한으로 하고, 동시에 수많은 요청을 처리하는데 효율적이다.

 

 

위 로직들도 아까와 같은 방법으로 테스트해보자.

5) 100개의 요청이 날라간다. 모든 요청은 대기상태다. event가 발생하여 setResult()가 호출할때까지 대기 상태다.

server.tomcat.max-threads=1 # default : 200
/**
 * E21_DeferredResultController.java 도 동일하게 수행해보자.
 * application.properties 에 'server.tomcat.max-threads=1' 에서 수행
 *
 * 100개 요청이 날라가면 모두 대기 상태다.
 * localhost:8080/dr/count 하면 100으로 출력된다.
 * localhost:8080/dr/event?msg=Async
 * 하면 응답이 한꺼번에 출력되고, dr/count 에는 0으로 출력된다.
 * 이벤트 1개로 100개의 요청에 한번에 응답을 주는 것이다.
*/
basic("http://localhost:8080/dr");

...

ExecutorService es = Executors.newFixedThreadPool(100);
  • 1) 실행
  • 2) http://localhost:8080/dr/count -> 100
  • 3) http://localhost:8080/dr/event?msg=Async -> 모든 요청이 대기상태에서 끝난다.

 

결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 25.253537458

 

 

Emitter 맛보기

E22_EmitterController.java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;

@RestController
@RequiredArgsConstructor
@Slf4j
public class E22_EmitterController {
    // keep
    Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();

    /**
     * ResponseBodyEmitter
     * HTTP 안에 Data를 여러번에 나눠서 보낼 수 있다.
     * 하나의 요청에 여러번 응답을 보낼 수 있다.
     * @return
     */
    @GetMapping("/emitter")
    public ResponseBodyEmitter dr() {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();

        Executors.newSingleThreadExecutor().submit(() -> {
            for (int i = 1; i <= 50; i++) {
                try {
                    // 50 번 나눠서 응답을 내려준다.
                    emitter.send("<p>Stream " + i + "</p>");
                    Thread.sleep(200);
                } catch (IOException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        return emitter;
    }
}
1) http://localhost:8080/emitter 호출

1부터 50까지 50번 나눠서 응답을 내려준다.

1, 2, 3, 4 ... 50 까지 하나씩 출력된 후의 모습이다.

http://localhost:8080/emitter

 

참고 : 토비의 봄 TV

 

반응형

Designed by JB FACTORY