[Spring Reactive Programming] 7. Reactive Streams - Flux 예제 맛보기

반응형
728x90
반응형

Mono vs Flux

객체 차이 설명
Mono 0 ~ 1 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체
Flux 0 ~ N 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체
- 하나의 데이터를 전달할 때마다 onNext() 이벤트를 발생한다.
- Flux 내의 모든 데이터의 전달 처리가 완료되면 onComplete() 이벤트가 발생한다.

 

'Mono 예제 맛보기' 포스팅 바로가기

https://devfunny.tistory.com/882

 

 

Flux 예제 맛보기

Event.java

@Data
@AllArgsConstructor
public class Event {
    long id;
    String value;
}

 

 

Mono

1) 단건

@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
    @GetMapping("/event/{id}")
    Mono<Event> event(@PathVariable long id) {
       return Mono.just(new Event(id, "event" + id));
    }
    
    ...
}

 

결과
2022-10-07 20:43:57.470  INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2022-10-07 20:43:57.472  INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | request(unbounded)
2022-10-07 20:43:57.473  INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | onNext(Event(id=1, value=event1))
2022-10-07 20:43:57.482  INFO 3524 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | onComplete()

 

2) 리스트 - 엔티티별로 수행되지 않고 통째로 수행된다.

@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
    ...
    
    @GetMapping("/event/list/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        // Mono 안에 리스트를 담는다면?
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list); // 하나의 데이터 (하나하나의 엔티티 레벨에서 뭔가 수행하는것은 안되고, 통째로 수행해야한다)
    }
    
    ...
}

 

결과
2022-10-07 20:47:43.938  INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2022-10-07 20:47:43.940  INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | request(unbounded)
2022-10-07 20:47:43.940  INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | onNext([Event(id=1, value=event1), Event(id=2, value=event2)])
2022-10-07 20:47:43.946  INFO 3712 --- [nio-8080-exec-1] reactor.Mono.Just.1                      : | onComplete()

 

 

Flux

1) 리스트 - 각 엔티티별로 이벤트성으로 결과를 받아온다.

@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
    /**
     * produces = MediaType.TEXT_EVENT_STREAM_VALUE
     * 리스트일때 각 엔티티별로 이벤트성으로 결과를 받을 수 있다.
     *
     * @return
     */
    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
    Flux<Event> events() {
        // Mono와 다르게 여러개를 넣을 수 있다.
        return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
    }
}

 

결과
2022-10-07 20:49:26.748  INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Array.1                     : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2022-10-07 20:49:26.750  INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Array.1                     : | request(1)
2022-10-07 20:49:26.751  INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Array.1                     : | onNext(Event(id=1, value=event1))
2022-10-07 20:49:26.761  INFO 4419 --- [      MvcAsync1] reactor.Flux.Array.1                     : | request(1)
2022-10-07 20:49:26.762  INFO 4419 --- [      MvcAsync1] reactor.Flux.Array.1                     : | onNext(Event(id=2, value=event2))
2022-10-07 20:49:26.762  INFO 4419 --- [      MvcAsync1] reactor.Flux.Array.1                     : | onComplete()
2022-10-07 20:49:26.763  INFO 4419 --- [      MvcAsync2] reactor.Flux.Array.1                     : | request(1)

 

2) 1초 delay

1초마다 10개씩, 10초(블로킹: 이메서드 전체 말고, 이 delay 처리하는 백그라운드 스레드)

= 백그라운드 스레드로 수행, 처음 수행된 스레드가 아닌 delay 처리하는 스레드가 10초동안 물고있다.

@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
    @GetMapping(value = "/events/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
    Flux<Event> eventStream() {
        return Flux.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
                .delayElements(Duration.ofSeconds(1)) // 1초 delay
                .take(10); // 10개 request가 오면 cancel() 날린다. 더이상 하지 않겠다.
    }
}

 

결과
2022-10-07 20:50:26.258  INFO 4419 --- [nio-8080-exec-3] reactor.Flux.Take.2                      : onSubscribe(FluxTake.TakeSubscriber)
2022-10-07 20:50:26.258  INFO 4419 --- [nio-8080-exec-3] reactor.Flux.Take.2                      : request(1)
-- 1초 후
2022-10-07 20:50:27.272  INFO 4419 --- [     parallel-1] reactor.Flux.Take.2                      : onNext(Event(id=1665143426256, value=value))
2022-10-07 20:50:27.275  INFO 4419 --- [      MvcAsync3] reactor.Flux.Take.2                      : request(1)
-- 1초 후
2022-10-07 20:50:28.274  INFO 4419 --- [     parallel-2] reactor.Flux.Take.2                      : onNext(Event(id=1665143427273, value=value))
2022-10-07 20:50:28.282  INFO 4419 --- [      MvcAsync4] reactor.Flux.Take.2                      : request(1)
-- 1초 후
2022-10-07 20:50:29.281  INFO 4419 --- [     parallel-3] reactor.Flux.Take.2                      : onNext(Event(id=1665143428275, value=value))
2022-10-07 20:50:29.289  INFO 4419 --- [      MvcAsync5] reactor.Flux.Take.2                      : request(1)
-- 1초 후

...

2022-10-07 20:50:36.323  INFO 4419 --- [    parallel-10] reactor.Flux.Take.2                      : onComplete()
2022-10-07 20:50:36.324  INFO 4419 --- [     MvcAsync12] reactor.Flux.Take.2                      : request(1)

1초마다 1개의 row씩 출력된다.

 

3) sink

generate() 메서드를 사용하여 동기적인 방식으로 데이터를 하나씩 방출시킨다. 

generate()
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
   return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}

 

SynchrnousSink.java
public interface SynchronousSink<T> {
   void complete();

   @Deprecated
   Context currentContext();

   default ContextView contextView() {
      return currentContext();
   }

   void error(Throwable e);

   void next(T t);
}

 

예제
@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
    @GetMapping(value = "/events/sink", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
    Flux<Event> eventSink() {
        return Flux
                .<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value"))) // sink : 데이터를 흘려서 보내는 역할
                .delayElements(Duration.ofSeconds(1)) // 1초 delay (1초마다 10개씩, 10초(블로킹: 이메서드 전체 말고, 이 delay 처리하는 백그라운드 스레드) : 백그라운드 스레드로 수행, 처음 수행된 스레드가 아닌 delay 처리하는 스레드가 10초동안 물고있다)
                .take(10); // 10개 request가 오면 cancel() 날린다. 더이상 하지 않겠다.
    }
}

 

결과
2022-10-07 20:53:20.502  INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Take.5                      : onSubscribe(FluxTake.TakeSubscriber)
2022-10-07 20:53:20.502  INFO 4419 --- [nio-8080-exec-1] reactor.Flux.Take.5                      : request(1)
2022-10-07 20:53:21.507  INFO 4419 --- [     parallel-1] reactor.Flux.Take.5                      : onNext(Event(id=1665143600502, value=value))
-- 1초 후
2022-10-07 20:53:21.512  INFO 4419 --- [     MvcAsync33] reactor.Flux.Take.5                      : request(1)
2022-10-07 20:53:22.509  INFO 4419 --- [     parallel-2] reactor.Flux.Take.5                      : onNext(Event(id=1665143600502, value=value))
-- 1초 후
2022-10-07 20:53:22.515  INFO 4419 --- [     MvcAsync34] reactor.Flux.Take.5                      : request(1)
2022-10-07 20:53:23.514  INFO 4419 --- [     parallel-3] reactor.Flux.Take.5                      : onNext(Event(id=1665143600502, value=value))
-- 1초 후
2022-10-07 20:53:23.518  INFO 4419 --- [     MvcAsync35] reactor.Flux.Take.5                      : request(1)
2022-10-07 20:53:24.519  INFO 4419 --- [     parallel-4] reactor.Flux.Take.5                      : onNext(Event(id=1665143600502, value=value))
...
2022-10-07 20:53:30.556  INFO 4419 --- [    parallel-10] reactor.Flux.Take.5                      : onComplete()
2022-10-07 20:53:30.558  INFO 4419 --- [     MvcAsync42] reactor.Flux.Take.5                      : request(1)

1초마다 1개의 row씩 출력된다.

4) 값 변형

@RestController
@RequiredArgsConstructor
@Slf4j
public class E42_FluxController {
    @GetMapping(value = "/events/sink/generate", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // 어떤 MediaType으로 리턴될것인가?
    Flux<Event> eventGenerateSink() {
        Flux<Event> es = Flux
                .<Event, Long>generate(() -> 1L, (id, sink) -> {
                    sink.next(new Event(id, "value" + id));
                    return id + 1; // 바뀐값
                })
                .take(10);// 10개 request가 오면 cancel() 날린다. 더이상 하지 않겠다.

        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // 0, 12, 2, 3, 4 등

        return Flux.zip(es, interval).map(tu -> tu.getT1());
    }
}

 

결과
2022-10-07 20:55:38.869  INFO 4419 --- [nio-8080-exec-7] reactor.Flux.Map.7                       : onSubscribe(FluxMap.MapSubscriber)
2022-10-07 20:55:38.869  INFO 4419 --- [nio-8080-exec-7] reactor.Flux.Map.7                       : request(1)
2022-10-07 20:55:39.882  INFO 4419 --- [     parallel-2] reactor.Flux.Map.7                       : onNext(Event(id=1, value=value1))
-- 1초 후
2022-10-07 20:55:39.884  INFO 4419 --- [     MvcAsync54] reactor.Flux.Map.7                       : request(1)
2022-10-07 20:55:40.877  INFO 4419 --- [     parallel-3] reactor.Flux.Map.7                       : onNext(Event(id=2, value=value2))

-- 1초 후
2022-10-07 20:55:40.885  INFO 4419 --- [     MvcAsync55] reactor.Flux.Map.7                       : request(1)
2022-10-07 20:55:41.888  INFO 4419 --- [     parallel-4] reactor.Flux.Map.7                       : onNext(Event(id=3, value=value3))

-- 1초 후
2022-10-07 20:55:41.891  INFO 4419 --- [     MvcAsync56] reactor.Flux.Map.7                       : request(1)
2022-10-07 20:55:42.890  INFO 4419 --- [     parallel-5] reactor.Flux.Map.7                       : onNext(Event(id=4, value=value4))
...
2022-10-07 20:55:48.920  INFO 4419 --- [     MvcAsync63] reactor.Flux.Map.7                       : request(1)
2022-10-07 20:55:48.921  INFO 4419 --- [     parallel-1] reactor.Flux.Map.7                       : onComplete()

1초마다 1개의 row씩 출력된다.

 

반응형

Designed by JB FACTORY