반응형
728x90
반응형
Mono vs Flux
객체 | 차이 | 설명 |
Mono | 0 ~ 1 개의 데이터 전달 | Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체 |
Flux | 0 ~ N 개의 데이터 전달 | Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체 - 하나의 데이터를 전달할 때마다 onNext() 이벤트를 발생한다. - Flux 내의 모든 데이터의 전달 처리가 완료되면 onComplete() 이벤트가 발생한다. |
'Mono 예제 맛보기' 포스팅 바로가기
https://devfunny.tistory.com/882
Flux 예제 맛보기
Event.java
@Data
@AllArgsConstructor
public class Event {
long id;
String value;
}
Mono
1) 단건
@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
@GetMapping("/event/{id}")
Mono<Event> event(@PathVariable long id) {
return Mono.just(new Event(id, "event" + id));
}
...
}
결과
2022-10-07 20:43:57.470 INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2022-10-07 20:43:57.472 INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | request(unbounded)
2022-10-07 20:43:57.473 INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onNext(Event(id=1, value=event1))
2022-10-07 20:43:57.482 INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onComplete()
2) 리스트 - 엔티티별로 수행되지 않고 통째로 수행된다.
@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
...
@GetMapping("/event/list/{id}")
Mono<List<Event>> hello(@PathVariable long id) {
// Mono 안에 리스트를 담는다면?
List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
return Mono.just(list); // 하나의 데이터 (하나하나의 엔티티 레벨에서 뭔가 수행하는것은 안되고, 통째로 수행해야한다)
}
...
}
결과
2022-10-07 20:47:43.938 INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2022-10-07 20:47:43.940 INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | request(unbounded)
2022-10-07 20:47:43.940 INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onNext([Event(id=1, value=event1), Event(id=2, value=event2)])
2022-10-07 20:47:43.946 INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onComplete()
Flux
1) 리스트 - 각 엔티티별로 이벤트성으로 결과를 받아온다.
@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
/**
* produces = MediaType.TEXT_EVENT_STREAM_VALUE
* 리스트일때 각 엔티티별로 이벤트성으로 결과를 받을 수 있다.
*
* @return
*/
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
Flux<Event> events() {
// Mono와 다르게 여러개를 넣을 수 있다.
return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
}
}
결과
2022-10-07 20:49:26.748 INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Array.1 : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2022-10-07 20:49:26.750 INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Array.1 : | request(1)
2022-10-07 20:49:26.751 INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Array.1 : | onNext(Event(id=1, value=event1))
2022-10-07 20:49:26.761 INFO 4419 --- [ MvcAsync1] reactor.Flux.Array.1 : | request(1)
2022-10-07 20:49:26.762 INFO 4419 --- [ MvcAsync1] reactor.Flux.Array.1 : | onNext(Event(id=2, value=event2))
2022-10-07 20:49:26.762 INFO 4419 --- [ MvcAsync1] reactor.Flux.Array.1 : | onComplete()
2022-10-07 20:49:26.763 INFO 4419 --- [ MvcAsync2] reactor.Flux.Array.1 : | request(1)
2) 1초 delay
1초마다 10개씩, 10초(블로킹: 이메서드 전체 말고, 이 delay 처리하는 백그라운드 스레드)
= 백그라운드 스레드로 수행, 처음 수행된 스레드가 아닌 delay 처리하는 스레드가 10초동안 물고있다.
@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
@GetMapping(value = "/events/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
Flux<Event> eventStream() {
return Flux.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
.delayElements(Duration.ofSeconds(1)) // 1초 delay
.take(10); // 10개 request가 오면 cancel() 날린다. 더이상 하지 않겠다.
}
}
결과
2022-10-07 20:50:26.258 INFO 4419 --- [nio-8080-exec-3] reactor.Flux.Take.2 : onSubscribe(FluxTake.TakeSubscriber)
2022-10-07 20:50:26.258 INFO 4419 --- [nio-8080-exec-3] reactor.Flux.Take.2 : request(1)
-- 1초 후
2022-10-07 20:50:27.272 INFO 4419 --- [ parallel-1] reactor.Flux.Take.2 : onNext(Event(id=1665143426256, value=value))
2022-10-07 20:50:27.275 INFO 4419 --- [ MvcAsync3] reactor.Flux.Take.2 : request(1)
-- 1초 후
2022-10-07 20:50:28.274 INFO 4419 --- [ parallel-2] reactor.Flux.Take.2 : onNext(Event(id=1665143427273, value=value))
2022-10-07 20:50:28.282 INFO 4419 --- [ MvcAsync4] reactor.Flux.Take.2 : request(1)
-- 1초 후
2022-10-07 20:50:29.281 INFO 4419 --- [ parallel-3] reactor.Flux.Take.2 : onNext(Event(id=1665143428275, value=value))
2022-10-07 20:50:29.289 INFO 4419 --- [ MvcAsync5] reactor.Flux.Take.2 : request(1)
-- 1초 후
...
2022-10-07 20:50:36.323 INFO 4419 --- [ parallel-10] reactor.Flux.Take.2 : onComplete()
2022-10-07 20:50:36.324 INFO 4419 --- [ MvcAsync12] reactor.Flux.Take.2 : request(1)
3) sink
generate() 메서드를 사용하여 동기적인 방식으로 데이터를 하나씩 방출시킨다.
generate()
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}
SynchrnousSink.java
public interface SynchronousSink<T> {
void complete();
@Deprecated
Context currentContext();
default ContextView contextView() {
return currentContext();
}
void error(Throwable e);
void next(T t);
}
예제
@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
@GetMapping(value = "/events/sink", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
Flux<Event> eventSink() {
return Flux
.<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value"))) // sink : 데이터를 흘려서 보내는 역할
.delayElements(Duration.ofSeconds(1)) // 1초 delay (1초마다 10개씩, 10초(블로킹: 이메서드 전체 말고, 이 delay 처리하는 백그라운드 스레드) : 백그라운드 스레드로 수행, 처음 수행된 스레드가 아닌 delay 처리하는 스레드가 10초동안 물고있다)
.take(10); // 10개 request가 오면 cancel() 날린다. 더이상 하지 않겠다.
}
}
결과
2022-10-07 20:53:20.502 INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Take.5 : onSubscribe(FluxTake.TakeSubscriber)
2022-10-07 20:53:20.502 INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Take.5 : request(1)
2022-10-07 20:53:21.507 INFO 4419 --- [ parallel-1] reactor.Flux.Take.5 : onNext(Event(id=1665143600502, value=value))
-- 1초 후
2022-10-07 20:53:21.512 INFO 4419 --- [ MvcAsync33] reactor.Flux.Take.5 : request(1)
2022-10-07 20:53:22.509 INFO 4419 --- [ parallel-2] reactor.Flux.Take.5 : onNext(Event(id=1665143600502, value=value))
-- 1초 후
2022-10-07 20:53:22.515 INFO 4419 --- [ MvcAsync34] reactor.Flux.Take.5 : request(1)
2022-10-07 20:53:23.514 INFO 4419 --- [ parallel-3] reactor.Flux.Take.5 : onNext(Event(id=1665143600502, value=value))
-- 1초 후
2022-10-07 20:53:23.518 INFO 4419 --- [ MvcAsync35] reactor.Flux.Take.5 : request(1)
2022-10-07 20:53:24.519 INFO 4419 --- [ parallel-4] reactor.Flux.Take.5 : onNext(Event(id=1665143600502, value=value))
...
2022-10-07 20:53:30.556 INFO 4419 --- [ parallel-10] reactor.Flux.Take.5 : onComplete()
2022-10-07 20:53:30.558 INFO 4419 --- [ MvcAsync42] reactor.Flux.Take.5 : request(1)
4) 값 변형
@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
@GetMapping(value = "/events/sink/generate", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
Flux<Event> eventGenerateSink() {
Flux<Event> es = Flux
.<Event, Long>generate(() -> 1L, (id, sink) -> {
sink.next(new Event(id, "value" + id));
return id + 1; // 바뀐값
})
.take(10);// 10개 request가 오면 cancel() 날린다. 더이상 하지 않겠다.
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // 0, 12, 2, 3, 4 등
return Flux.zip(es, interval).map(tu -> tu.getT1());
}
}
결과
2022-10-07 20:55:38.869 INFO 4419 --- [nio-8080-exec-7] reactor.Flux.Map.7 : onSubscribe(FluxMap.MapSubscriber)
2022-10-07 20:55:38.869 INFO 4419 --- [nio-8080-exec-7] reactor.Flux.Map.7 : request(1)
2022-10-07 20:55:39.882 INFO 4419 --- [ parallel-2] reactor.Flux.Map.7 : onNext(Event(id=1, value=value1))
-- 1초 후
2022-10-07 20:55:39.884 INFO 4419 --- [ MvcAsync54] reactor.Flux.Map.7 : request(1)
2022-10-07 20:55:40.877 INFO 4419 --- [ parallel-3] reactor.Flux.Map.7 : onNext(Event(id=2, value=value2))
-- 1초 후
2022-10-07 20:55:40.885 INFO 4419 --- [ MvcAsync55] reactor.Flux.Map.7 : request(1)
2022-10-07 20:55:41.888 INFO 4419 --- [ parallel-4] reactor.Flux.Map.7 : onNext(Event(id=3, value=value3))
-- 1초 후
2022-10-07 20:55:41.891 INFO 4419 --- [ MvcAsync56] reactor.Flux.Map.7 : request(1)
2022-10-07 20:55:42.890 INFO 4419 --- [ parallel-5] reactor.Flux.Map.7 : onNext(Event(id=4, value=value4))
...
2022-10-07 20:55:48.920 INFO 4419 --- [ MvcAsync63] reactor.Flux.Map.7 : request(1)
2022-10-07 20:55:48.921 INFO 4419 --- [ parallel-1] reactor.Flux.Map.7 : onComplete()
반응형