[Spring Reactive Programming] 6. Reactive Streams - Mono 예제 맛보기

반응형
728x90
반응형

Mono vs Flux

객체 차이 설명
Mono 0 ~ 1 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체
Flux 0 ~ N 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체
- 하나의 데이터를 전달할 때마다 onNext() 이벤트를 발생한다.
- Flux 내의 모든 데이터의 전달 처리가 완료되면 onComplete() 이벤트가 발생한다.

 

'Flux 예제 맛보기' 포스팅 바로가기

https://devfunny.tistory.com/883

 

 

gradle 설정

implementation 'org.springframework.boot:spring-boot-starter-webflux'

 

 

Mono 예제 맛보기

1) just()

@GetMapping("/mono1")
    Mono<String> hello1() {
        // Mono : Publusher 을 만들고, (Publisher) 연결하고, 마지막에 Subscriber에 전달
        Mono m = Mono.just("Hello Webflux").log(); // subscribe() 됬을때의 로그
        return m;
    }

 

결과
2022-10-07 20:08:12.934  INFO 96651 --- [nio-8080-exec-2] reactor.Mono.Just.2                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2022-10-07 20:08:12.935  INFO 96651 --- [nio-8080-exec-2] reactor.Mono.Just.2                      : | request(unbounded)
2022-10-07 20:08:12.935  INFO 96651 --- [nio-8080-exec-2] reactor.Mono.Just.2                      : | onNext(Hello Webflux)
2022-10-07 20:08:12.935  INFO 96651 --- [nio-8080-exec-2] reactor.Mono.Just.2                      : | onComplete()

 

2) 비동기

@GetMapping("/mono2")
Mono<String> hello2() {
    log.info("pos1");
    Mono m = Mono.just("Hello Webflux").log(); // subscribe() 됬을때의 로그
    log.info("pos2");

    return m;
}

 

결과
2022-10-07 20:08:44.589  INFO 96651 --- [nio-8080-exec-4] com.reactive.step09.E41_MonoController   : pos1
2022-10-07 20:08:44.596  INFO 96651 --- [nio-8080-exec-4] com.reactive.step09.E41_MonoController   : pos2
2022-10-07 20:08:44.597  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.Just.3                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2022-10-07 20:08:44.597  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.Just.3                      : | request(unbounded)
2022-10-07 20:08:44.597  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.Just.3                      : | onNext(Hello Webflux)
2022-10-07 20:08:44.598  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.Just.3                      : | onComplete()

 

3) doOnNext() 수행 후, log()가 수행

@GetMapping("/mono3")
Mono<String> hello3() {
    log.info("pos1");
    
    // 비동기 방식
    // subscribe() 됬을때의 로그
    Mono m2 = Mono.just("Hello Webflux").doOnNext(c -> log.info(c)).log();
    return m2;
}

 

결과

사실상 동기적인 코드다. doOnNext() 수행 후, log()가 수행된다.

값을 받아서 출력하거나 또는 로그를 남기는 용도인데, 이는 스프링에 넘어가고, 스프링에서 subscribe()하는 시점에 호출된다.

2022-10-07 20:09:00.752  INFO 96651 --- [nio-8080-exec-5] com.reactive.step09.E41_MonoController   : pos1
2022-10-07 20:09:00.762  INFO 96651 --- [nio-8080-exec-5] reactor.Mono.PeekFuseable.4              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:09:00.762  INFO 96651 --- [nio-8080-exec-5] reactor.Mono.PeekFuseable.4              : | request(unbounded)
2022-10-07 20:09:00.763  INFO 96651 --- [nio-8080-exec-5] com.reactive.step09.E41_MonoController   : Hello Webflux
2022-10-07 20:09:00.763  INFO 96651 --- [nio-8080-exec-5] reactor.Mono.PeekFuseable.4              : | onNext(Hello Webflux)
2022-10-07 20:09:00.763  INFO 96651 --- [nio-8080-exec-5] reactor.Mono.PeekFuseable.4              : | onComplete()

 

4) 메서드 호출을 할 경우

@GetMapping("/mono5")
Mono<String> hello5() {
    log.info("pos1");
    Mono m4 = Mono.just(generateHello()).doOnNext(c -> log.info(c)).log();
    log.info("pos2");
    
    return m4;
}

 

generateHello()
private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
}

 

결과
2022-10-07 20:09:21.444  INFO 96651 --- [nio-8080-exec-7] com.reactive.step09.E41_MonoController   : pos1
2022-10-07 20:09:21.444  INFO 96651 --- [nio-8080-exec-7] com.reactive.step09.E41_MonoController   : method generateHello()
2022-10-07 20:09:21.444  INFO 96651 --- [nio-8080-exec-7] com.reactive.step09.E41_MonoController   : pos2
--- 이후 Mono 안의 로직 수행 로그
2022-10-07 20:09:21.445  INFO 96651 --- [nio-8080-exec-7] reactor.Mono.PeekFuseable.6              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:09:21.445  INFO 96651 --- [nio-8080-exec-7] reactor.Mono.PeekFuseable.6              : | request(unbounded)
2022-10-07 20:09:21.445  INFO 96651 --- [nio-8080-exec-7] com.reactive.step09.E41_MonoController   : Hello Mono
2022-10-07 20:09:21.446  INFO 96651 --- [nio-8080-exec-7] reactor.Mono.PeekFuseable.6              : | onNext(Hello Mono)
2022-10-07 20:09:21.446  INFO 96651 --- [nio-8080-exec-7] reactor.Mono.PeekFuseable.6              : | onComplete()

 

5) fromSupplier

@GetMapping("/mono6")
Mono<String> hello6() {
    Mono<String> m5 = Mono.fromSupplier(() -> generateHello()).doOnNext(c -> log.info(c)).log();

    // mono -> String
    String blockMsg = m5.block();

    log.info("pos2: {}", blockMsg);

    return m5;
}

 

generateHello()
private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
}

 

결과
----- block() 호출 이후 출력
2022-10-07 20:10:14.652  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:10:14.653  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | request(unbounded)
2022-10-07 20:10:14.653  INFO 96651 --- [io-8080-exec-10] com.reactive.step09.E41_MonoController   : method generateHello()
----- doOnNext()
2022-10-07 20:10:14.653  INFO 96651 --- [io-8080-exec-10] com.reactive.step09.E41_MonoController   : Hello Mono
2022-10-07 20:10:14.653  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | onNext(Hello Mono)
2022-10-07 20:10:14.653  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | onComplete()
2022-10-07 20:10:14.653  INFO 96651 --- [io-8080-exec-10] com.reactive.step09.E41_MonoController   : pos2: Hello Mono
----- spring subscribe()
2022-10-07 20:10:14.654  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:10:14.654  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | request(unbounded)
2022-10-07 20:10:14.654  INFO 96651 --- [io-8080-exec-10] com.reactive.step09.E41_MonoController   : method generateHello()
2022-10-07 20:10:14.654  INFO 96651 --- [io-8080-exec-10] com.reactive.step09.E41_MonoController   : Hello Mono
2022-10-07 20:10:14.654  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | onNext(Hello Mono)
2022-10-07 20:10:14.654  INFO 96651 --- [io-8080-exec-10] reactor.Mono.PeekFuseable.8              : | onComplete()

 

6) subscribe() 여러 방식

  • Hot : 새로운 구독을 하더라도 구독 이후의 시점의 실시간 데이터를 통지한다.
  • Cole : 데이터가 만들어져서 고정되어 어느 Subscriber 가 요청하던 데이터가 미리 셋팅되어 전송
    • 새로운 구독이 발생해도 동일한 데이터를 통지한다.
@GetMapping("/mono")
Mono<String> hello() {
    Mono<String> m5 = Mono.fromSupplier(() -> generateHello()).doOnNext(c -> log.info(c)).log();

    m5.subscribe(); // subscribe() 하고 리턴하면?
    return m5;
}

 

generateHello()
private String generateHello() {
    log.info("method generateHello()");
    return "Hello Mono";
}

 

결과
----- m5.subscribe() 이후
2022-10-07 20:12:12.488  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:12:12.488  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | request(unbounded)
2022-10-07 20:12:12.488  INFO 96651 --- [nio-8080-exec-3] com.reactive.step09.E41_MonoController   : method generateHello()
2022-10-07 20:12:12.488  INFO 96651 --- [nio-8080-exec-3] com.reactive.step09.E41_MonoController   : Hello Mono
2022-10-07 20:12:12.488  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | onNext(Hello Mono)
2022-10-07 20:12:12.488  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | onComplete()
----- spring subscribe()
2022-10-07 20:12:12.489  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:12:12.489  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | request(unbounded)
2022-10-07 20:12:12.489  INFO 96651 --- [nio-8080-exec-3] com.reactive.step09.E41_MonoController   : method generateHello()
2022-10-07 20:12:12.489  INFO 96651 --- [nio-8080-exec-3] com.reactive.step09.E41_MonoController   : Hello Mono
2022-10-07 20:12:12.489  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | onNext(Hello Mono)
2022-10-07 20:12:12.489  INFO 96651 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.9              : | onComplete()

 

7) block()

@GetMapping("/mono/block")
Mono<String> helloMono() {
    Mono<String> m6 = Mono.fromSupplier(() -> generateHello()).doOnNext(c -> log.info(c)).log();
    String blockMsg6 = m6.block();
    log.info("pos2: {}", blockMsg6);
    return m6;
}

 

generateHello()
private String generateHello() {
    log.info("method generateHello()");
    return "Hello Mono";
}

 

결과
----- block() 호출 이후
2022-10-07 20:12:50.035  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:12:50.035  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | request(unbounded)
2022-10-07 20:12:50.035  INFO 96651 --- [nio-8080-exec-4] com.reactive.step09.E41_MonoController   : method generateHello()
2022-10-07 20:12:50.035  INFO 96651 --- [nio-8080-exec-4] com.reactive.step09.E41_MonoController   : Hello Mono
2022-10-07 20:12:50.035  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | onNext(Hello Mono)
2022-10-07 20:12:50.035  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | onComplete()
2022-10-07 20:12:50.035  INFO 96651 --- [nio-8080-exec-4] com.reactive.step09.E41_MonoController   : pos2: Hello Mono
----- spring subscribe()
2022-10-07 20:12:50.036  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2022-10-07 20:12:50.037  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | request(unbounded)
2022-10-07 20:12:50.037  INFO 96651 --- [nio-8080-exec-4] com.reactive.step09.E41_MonoController   : method generateHello()
2022-10-07 20:12:50.037  INFO 96651 --- [nio-8080-exec-4] com.reactive.step09.E41_MonoController   : Hello Mono
2022-10-07 20:12:50.037  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | onNext(Hello Mono)
2022-10-07 20:12:50.037  INFO 96651 --- [nio-8080-exec-4] reactor.Mono.PeekFuseable.10             : | onComplete()

 

 

반응형

Designed by JB FACTORY