반응형
728x90
반응형
비동기 구현 예제 1) Runnable
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
매개변수, 리턴값이 없는 로직
ExecutorService es = Executors.newCachedThreadPool();
/** 별도의 스레드로 실행해보자. */
es.execute(() -> { // 매개변수, 리턴값이 없는 Runnable 구현
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Async");
// return "Hello"; // Runnable 은 리턴이 없다.
});
log.info("Exit");
결과
[main] INFO com.reactive.step04.E16_FutureEx - Exit
[pool-1-thread-1] INFO com.reactive.step04.E16_FutureEx - Async
비동기 구현 예제 2) Callable
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
매개변수는 없고 리턴값이 존재하는 경우
/** 리턴이 필요한 경우 : 별도의 스레드로 실행해보자. */
es.submit(new Callable<String>() {
@Override
public String call() throws Exception { // Callable 구현
Thread.sleep(2000);
log.info("Async");
return "Hello";
}
});
log.info("Exit");
결과
[main] INFO com.reactive.step04.E16_FutureEx - Exit
[pool-1-thread-1] INFO com.reactive.step04.E16_FutureEx - Async
리턴값을 main 스레드로 가져오고싶은 경우 - Future
/** 리턴 결과를 main 스레드로 가져오고싶다.*/
/** 리턴이 필요한 경우 : 별도의 스레드로 실행해보자. */
Future<String> f = es.submit(new Callable<String>() {
@Override
public String call() throws Exception { // Callable 구현
Thread.sleep(2000);
log.info("Async");
return "Hello";
}
});
System.out.println(f.isDone()); // false (수행 시점에 작업 완료 여부 출력)
Thread.sleep(2000);
log.info("Start"); // 제일 먼저 수행되겠다.
System.out.println(f.isDone()); // true (true 일 경우 get()으로 값을 가져오는 로직을 사용할 수도 있다.)
log.info(f.get());
log.info("Exit"); // get() 후인 "Exit"은 제일 마지막에 수행되겠다.
수행 결과
false
[main] INFO com.reactive.step04.E16_FutureEx - Start
false
[pool-1-thread-1] INFO com.reactive.step04.E16_FutureEx - Async
[main] INFO com.reactive.step04.E16_FutureEx - Hello
[main] INFO com.reactive.step04.E16_FutureEx - Exit
FutureTask 사용하기
FutureTask.java
public class FutureTask<V> implements RunnableFuture<V> {
...
protected void done() { } // 작업이 완료되면 호출되는 메서드 구현
...
}
SuccessCallback.java
/**
* 성공 콜백
*/
interface SuccessCallback {
void onSuccess(String result);
}
ErrorCallback.jaava
/**
* 에러 콜백
*/
interface ExceptionCallback {
void onError(Throwable t);
}
CallbackFutureTask.java
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
ExceptionCallback ec;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
this.sc = Objects.requireNonNull(sc); // null 이 아니여야하고, null 이면 NullPointerException 발생
this.ec = Objects.requireNonNull(ec);
}
@Override
protected void done() {
try {
sc.onSuccess(get());
} catch (InterruptedException e) { // 예외 발생시, main 스레드에 어떻게 던져줄까?
// interrupt 발생 시그널을 주자.
Thread.currentThread().interrupt();
// 작업을 수행하지말고 종료하라는 signal을 주는 에러
} catch (ExecutionException e) {
ec.onError(e.getCause());
}
}
}
main 메서드
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
// if (1 == 1) {
// throw new RuntimeException("Aysnc ERROR!!!"); // 고의 에러 발생
// }
log.info("Async");
return "Hello";
}, new SuccessCallback() {
@Override
public void onSuccess(String result) {
System.out.println("Result: " + result);
}
}, new ExceptionCallback() {
@Override
public void onError(Throwable t) {
System.out.println("Error: " + t.getMessage());
}
});
es.execute(f);
es.shutdown();
}
결과
[pool-3-thread-1] INFO com.reactive.step04.E18_FutureTaskCallbackEx - Async
Result: Hello
Controller 테스트
@RestController
@RequiredArgsConstructor
@Slf4j
public class E19_MyController {
@GetMapping("/async")
public String async() throws InterruptedException {
// 이 로직을 수행하는 동안 블로킹 되는걸 원치않는다. -> callable 로 변경
Thread.sleep(2000);
return "hello";
}
}
http://localhost:8080/async 접속
2초 후 아래와 같이 hello가 찍힌다.
블로킹 되지 않도록 Callable로 구현해보기
@RestController
@RequiredArgsConstructor
@Slf4j
public class E19_MyController {
...
/**
* http-nio-8080-exec-3 (tomcat 위에서 도는 쓰레드)
* main에서 실행하는게 아니므로 main 쓰레드가 아니다.
* @return
* @throws InterruptedException
*/
@GetMapping("/callable")
public Callable<String> callable() throws InterruptedException {
log.info("callable"); // http-nio-8080-exec-3
return () -> {
log.info("async"); // console 에는 이미 찍히고, (MvcAsync1 이라는 별도의 쓰레드로 실행 - 스프링)
Thread.sleep(2000);
return "hello"; // 결과를 클라이언트에 내려준다.
};
// return new Callable<String>() {
// @Override
// public String call() throws Exception {
// log.info("async"); // console 에는 이미 찍히고, (MvcAsync1 이라는 별도의 쓰레드로 실행 - 스프링)
// Thread.sleep(2000);
// return "hello"; // 결과를 클라이언트에 내려준다.
// }
// };
}
}
결과
[nio-8080-exec-8] com.reactive.step04.E19_MyController : callable
[ MvcAsync2] com.reactive.step04.E19_MyController : async
스레드 할당 과정 보기
어떤 웹 요청이 동시에 100개 요청이 오면 100개의 스레드 생성하여 할당되는 과정 보기
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 어떤 웹 요청이 동시에 100개 요청이 오면 100개의 스레드 생성하여 할당되는 과정 보기
*/
@Slf4j
public class E20_LoadTest {
static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 호출
}
private static void basic(String url) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(100);
// ExecutorService es = Executors.newFixedThreadPool(20); (2)
RestTemplate rt = new RestTemplate();
StopWatch main = new StopWatch();
main.start();
for (int i = 0; i < 100; i++) {
es.execute(() -> {
int idx = counter.addAndGet(1);
log.info("Thread: {}", idx);
StopWatch sw = new StopWatch();
sw.start();
rt.getForObject(url, String.class);
sw.stop();
log.info("Elapsed: " + idx + " -> " + sw.getTotalTimeMillis());
});
}
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS); // 대기작업이 끝날때까지 100초까지만 기다린다.
main.stop();
log.info("Total : {}", main.getTotalTimeSeconds());
}
}
1) 스레드 100개가 생성되어 100개의 요청을 각 스레드가 수행한다.
/**
* 100개가 동시에 2초정도 걸림
* 각 요청별로 스레드가 생성되어 총 스레드 100개가 생성되었다.
*/
basic("http://localhost:8080/async");
...
ExecutorService es = Executors.newFixedThreadPool(100);
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 2.133658542
2) 스레드를 20개 생성하여, 100개의 요청을 수행한다. 2초짜리 작업 x 5회 = 10초정도 걸린다.
server.tomcat.max-threads=20 # default : 200
/**
* application.properties 에 'server.tomcat.max-threads=20' 추가 후 수행해보자.
* 10초 정도 걸린다.
* 스레드는 1 ~ 20까지 총 20개만 생성되었다.
* 100개의 요청이 갔지만 스레드는 20개까지만 생성 가능하므로 나머지 80개는 큐에서 대기중이였다.
* 스레드가 풀리면 다음 20개, 그 다음 20개.. 이렇게 되면서 2초짜리 작업 x 5 = 10초 정도 걸린다.
*/
basic("http://localhost:8080/async");
...
ExecutorService es = Executors.newFixedThreadPool(20); (2)
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 10.132215125
3) 스레드를 20개 생성하고, 작업 스레드를 100개 만들어 사용한다.
/**
* callable 호출해보자. (비동기)
* 2초 걸렸다.
* 스레드가 20개만 생성됨에도 위 동기 방식보다 훨씬 빠르다.
*
* 서블릿 스레드 / 작업 스레드
* 서블릿 스레드는 20개로 돌려쓰지만, 2초짜리 작업인 작업 스레드는 100개를 만들어 사용한 것이다.
*/
basic("http://localhost:8080/callable");
...
ExecutorService es = Executors.newFixedThreadPool(100);
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 2.125754417
4) 서블릿 스레드 1개, 작업 스레드 100개로 100개의 요청을 처리한다.
server.tomcat.max-threads=1 # default : 200
/**
* application.properties 에 'server.tomcat.max-threads=1' 로 변경 후 수행해보자.
* 2.3 초 안에 100개 요청이 처리되었다.
*
* 서블릿 스레드 / 작업 스레드
* 서블릿 스레드는 1개로 돌려쓰지만, 2초짜리 작업인 작업 스레드는 100개를 만들어 사용한 것이다.
*
* -> 스레드 풀의 개수가 100개 정도인데, 긴 작업을 수행하는게 20개정도 있을때
* 긴 작업은 작업 스레드가 수행하게하고, 그러는 사이에 서블릿 스레드는 빠르게 처리해서 마칠 수 있는 일반적인 스레드의 처리 작업에
* 다 할당을 시키면 서블릿 스레드의 활용도가 높다.
*
* (결론)
* 서블릿 스레드는 빠르게 반환되어 1개의 스레드를 가지고도 많은 요청을 처리할 수는 있다.
*/
basic("http://localhost:8080/callable");
...
ExecutorService es = Executors.newFixedThreadPool(100);
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 2.220901875
DeferrdResultController.java 생성하여 테스트해보기
DeferrdResultController.java
package com.reactive.step04;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
@RestController
@RequiredArgsConstructor
@Slf4j
public class E21_DeferredResultController {
// keep
Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
@GetMapping("/dr")
public DeferredResult<String> dr() {
log.info("dr");
DeferredResult<String> dr = new DeferredResult<>();
results.add(dr);
return dr;
}
@GetMapping("/dr/count")
public String drCount() {
return String.valueOf(results.size());
}
@GetMapping("/dr/event")
public String drEvent(String msg) {
for (DeferredResult<String> dr : results) {
dr.setResult("Hello " + msg);
results.remove(dr);
}
return "OK";
}
}
실행순서
- 1. localhost:8080/dr 수행
- 응답이 오지 않은 대기 상태다.
- 2. localhost:8080/dr/count 수행
- 위 1)번에서 수행된 add()로 총 1개다.
- 3. localohost:8080/dr/event?msg=Result 수행
- dr.setResult()가 호출되면서 1)번의 대기 상태가 끝나고 결과가 출력되면서 끝이난다.
위 과정은 작업 스레드를 별도로 생성하지 않는다. DeferredResut Object만 메모리에 유지가 되고, 위와 같은 방식으로 이벤트 발생시 결과를 내려준다. Servlet 자원은 최소한으로 하고, 동시에 수많은 요청을 처리하는데 효율적이다.
위 로직들도 아까와 같은 방법으로 테스트해보자.
5) 100개의 요청이 날라간다. 모든 요청은 대기상태다. event가 발생하여 setResult()가 호출할때까지 대기 상태다.
server.tomcat.max-threads=1 # default : 200
/**
* E21_DeferredResultController.java 도 동일하게 수행해보자.
* application.properties 에 'server.tomcat.max-threads=1' 에서 수행
*
* 100개 요청이 날라가면 모두 대기 상태다.
* localhost:8080/dr/count 하면 100으로 출력된다.
* localhost:8080/dr/event?msg=Async
* 하면 응답이 한꺼번에 출력되고, dr/count 에는 0으로 출력된다.
* 이벤트 1개로 100개의 요청에 한번에 응답을 주는 것이다.
*/
basic("http://localhost:8080/dr");
...
ExecutorService es = Executors.newFixedThreadPool(100);
- 1) 실행
- 2) http://localhost:8080/dr/count -> 100
- 3) http://localhost:8080/dr/event?msg=Async -> 모든 요청이 대기상태에서 끝난다.
결과
[main] INFO com.reactive.step04.E20_LoadTest - Total : 25.253537458
Emitter 맛보기
E22_EmitterController.java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;
@RestController
@RequiredArgsConstructor
@Slf4j
public class E22_EmitterController {
// keep
Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
/**
* ResponseBodyEmitter
* HTTP 안에 Data를 여러번에 나눠서 보낼 수 있다.
* 하나의 요청에 여러번 응답을 보낼 수 있다.
* @return
*/
@GetMapping("/emitter")
public ResponseBodyEmitter dr() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
Executors.newSingleThreadExecutor().submit(() -> {
for (int i = 1; i <= 50; i++) {
try {
// 50 번 나눠서 응답을 내려준다.
emitter.send("<p>Stream " + i + "</p>");
Thread.sleep(200);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
});
return emitter;
}
}
1) http://localhost:8080/emitter 호출
1부터 50까지 50번 나눠서 응답을 내려준다.
1, 2, 3, 4 ... 50 까지 하나씩 출력된 후의 모습이다.
참고 : 토비의 봄 TV
반응형