ReactiveStreams 구현 라이브러리별 예제코드 정리 (Reactor, RxJava, Munity)

반응형
728x90
반응형

Reactive streams 구현 라이브러리

ReactiveStreams의 구현 라이브러리를 하나씩 알아보자.

  • Project reactor
    • 리액티브 스트림 스펙의 구현체 중 하나로, 리액티브 프로그래밍 라이브러리
  • RxJava
    • 리액티브 프로그래밍을 위한 라이브러리
  • Mutiny
    • 리액티브 프로그래밍을 위한 작은 크기의 라이브러리

 

 

Project reactor

  • Pivotal 사에서 개발
  • Spring reactor에서 사용
    • Java 및 Spring에서 사용되는 리액티브 프로그래밍 라이브러리
  • Mono와 Flux publisher를 통해 비동기적으로 데이터를 다룰 수 있다.

 

 

 

Project reactor - Flux

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • backPressure 지원

 

 

Flux 예제

SimpleSubscriber
@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
    private final Integer count;

    /**
     * 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
     * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count); // count만큼 request
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(100);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

FluxSimpleExample
@Slf4j
public class p181_FluxSimpleExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        // main 쓰레드에서 수행
        getItems() // 고정된 개수를 subscribe
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

실행결과

5개의 item이 전달된 결과

13:18:58.672 [main] INFO com.example06.reactor.p181_FluxSimpleExample - start main
13:18:58.733 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:18:58.737 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:18:58.904 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:18:59.049 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:18:59.233 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:18:59.399 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:18:59.578 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
13:18:59.578 [main] INFO com.example06.reactor.p181_FluxSimpleExample - end main

 

 

Flux - subscribeOn 예제

FluxSimpleSubscribeOnExample
@Slf4j
public class p182_FluxSimpleSubscribeOnExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
                    log.info("map {}", i);
                    return i;
                })
                // main 쓰레드가 아닌 다른 쓰레드에서 수행
                .subscribeOn(Schedulers.single())
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main"); // 바로 호출

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

실행결과

main 쓰레드가 아닌 single-1 쓰레드로 수행됨을 알 수 있다.

13:22:13.042 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - start main
13:22:13.094 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:22:13.122 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - end main
13:22:13.124 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 1
13:22:13.124 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:22:13.264 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 2
13:22:13.264 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:22:13.440 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 3
13:22:13.441 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:22:13.613 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 4
13:22:13.614 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:22:13.789 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 5
13:22:13.789 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:22:13.969 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - complete

 

▶ 별도 쓰레드로 수행시키는 작업과 관련해서 아래 포스팅을 참고하자.

https://devfunny.tistory.com/878

 

[Spring Reactive Programming] 3. Reactive Streams - Publisher, Subscriber 별도 스레드로 수행시키기

subscribe()의 과정을 별도 스레드로 수행 1) publisher 생성 Publisher pub = sub -> { sub.onSubscribe(new Subscription() { @Override public void request(long n) { log.debug("request()"); sub.onNext(1); sub.onNext(2); sub.onNext(3); sub.onNext(4)

devfunny.tistory.com

 

 

Flux - subscribe

FluxNoSubscribeExample

subscribe하지 않으면, 아무 일도 일어나지 않는다.

@Slf4j
public class p183_FluxNoSubscribeExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems(); // subscribe 하지않으면 아무일도 일어나지 않는다.
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            log.info("start getItems");
            for (int i = 0; i < 5; i++) {
                fluxSink.next(i);
            }
            fluxSink.complete();
            log.info("end getItems");
        });
    }
}

 

 

Flux - backPressure

1) 1번째 예제

@Slf4j
public class p184_FluxSimpleRequestThreeExample {
    public static void main(String[] args) {
        // 3개 요청 (1, 2, 3 이후 종료) , 추가적인 요청 없음 
        getItems().subscribe(new p181_SimpleSubscriber<>(3));
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

SimpleSubscriber
@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
    private final Integer count;

    /**
     * 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
     * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count); // count만큼 request
        log.info("request: {}", count);
    }
    
    ...
}

 

실행결과

3개 요청 이후, 나머지는 수행되지 않는다.

09:24:32.953 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 3
09:24:32.958 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:24:33.091 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:24:33.232 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3

 

 

2) 2번째 예제

@Slf4j
public class p185_FluxContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        getItems().subscribe(new p185_ContinuousRequestSubscriber<>());
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

ContinuousRequestSubscriber
@Slf4j
public class p185_ContinuousRequestSubscriber<T>
        implements Subscriber<T> {
    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count); // 개수만큼 요청
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        // 1개를 또 호출
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

실행결과

1개씩 request() 하기 때문에 아래와 같이 각 아이템별로 request(), onNext()가 호출된다.

09:33:34.419 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:34.425 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:33:39.718 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:39.727 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete

 

 

Flux - error

@Slf4j
public class p186_FluxErrorExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.next(0);
            fluxSink.next(1);
            var error = new RuntimeException("error in flux");
            fluxSink.error(error); // 에러 전달
        });
    }
}

 

실행결과
09:36:46.692 [main] INFO com.example06.reactor.p186_FluxErrorExample - start main
09:36:46.756 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:36:46.764 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 0
09:36:46.885 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:36:47.028 [main] ERROR com.example06.reactor.p181_SimpleSubscriber - error: error in flux
09:36:47.028 [main] INFO com.example06.reactor.p186_FluxErrorExample - end main

 

 

Flux - complete

@Slf4j
public class p187_FluxCompleteExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.complete(); // complete 전달
        });
    }
}

 

실행결과
09:37:31.038 [main] INFO com.example06.reactor.p187_FluxCompleteExample - start main
09:37:31.100 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:37:31.109 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:37:31.109 [main] INFO com.example06.reactor.p187_FluxCompleteExample - end main

 

 

 

Project reactor - Mono

  • 0..1개의 item을 전달
  • 에러가 발생하면 error signal 전달하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료

 

 

Mono 예제

  • 1개의 item만 전달하기 때문에 next 하나만 실행하면 complete가 보장
  • 혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미 - 하나의 값이 있거나 없다
@Slf4j
public class p190_MonoSimpleExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    private static Mono<Integer> getItems() {
        return Mono.create(monoSink -> {
            monoSink.success(1);
        });
    }
}

 

실행결과
09:39:02.445 [main] INFO com.example06.reactor.p190_MonoSimpleExample - start main
09:39:02.481 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:39:02.658 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:39:02.658 [main] INFO com.example06.reactor.p190_MonoSimpleExample - end main

 

 

Mono와 Flux

Mono<T> : Optional<T>

  • 없거나 혹은 하나의 값
  • Mono로 특정 사건이 완료되는 시점을 가리킬 수도 있다

 

Flux<T>: List<T>

  • 무한하거나 유한한 여러 개의 값

 

 

Flux를 Mono로 변환

  • Mono.from으로 Flux를 Mono로 변환
  • 첫 번째 값만 전달
@Slf4j
public class p192_FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        // 1,2,3,4,5 중 첫번째값 1이 onNext로 전달되고 complete
        // 뒤에 있는 값들은 모두 무시
        Mono.from(getItems())
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

실행결과

Mono는 1개만 전달되므로, 원소 1은 수행되고 그 이후 원소들은 무시됨을 확인할 수 있다.

09:40:37.275 [main] INFO com.example06.reactor.p192_FluxToMonoExample - start main
09:40:37.340 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:40:37.365 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:40:37.473 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:40:37.473 [main] INFO com.example06.reactor.p192_FluxToMonoExample - end main

 

 

Flux를 Mono로 변환 (Mono<List<T>>)

@Slf4j
public class p193_FluxToListMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                // collect 하고 complete 이벤트 발생 시점에 모은 값들을 모두 전달
                // 1, 2, 3, 4, 5를 내부 배열에 저장하고, 가지고있던 값들을 모두 onNext() 한다.
                // 하나로 합쳐져서 Mono로 한번 요청됨 ([1,2,3,4,5]
                .collectList()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

실행결과
09:41:24.680 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - start main
09:41:24.743 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:41:24.767 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:41:24.940 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:41:24.940 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - end main

 

 

Mono를 Flux로 변환

  • flux()
  • Mono를 next 한 번 호출하고 onComplete를 호출하는 Flux로 변환
@Slf4j
public class p194_MonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");

        // flux() - Mono를 next 한번 호출하고 onComplete를 호출하는 Flux로 변환
        // [1,2,3,4,5]
        getItems().flux()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Mono<List<Integer>> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
}

 

실행결과
09:42:16.606 [main] INFO com.example06.reactor.p194_MonoToFluxExample - start main
09:42:16.650 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:42:16.694 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:42:16.802 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:42:16.802 [main] INFO com.example06.reactor.p194_MonoToFluxExample - end main

 

 

Mono를 Flux로 변환 (Mono<List<T>> -> Flux<T>)

@Slf4j
public class p195_ListMonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                // Mono의 결과를 Flux 형태로 바꾸고, flux를 받아서 처리
                // 1, 2, 3, 4, 5 하나씩 처리 
                .flatMapMany(value -> Flux.fromIterable(value))
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Mono<List<Integer>> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
}

 

실행결과
09:43:07.895 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - start main
09:43:07.931 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:43:08.082 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:43:08.239 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:43:08.414 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:43:08.588 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:43:08.724 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:43:08.724 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - end main

 

 

 

Flux와 Mono 예제 이전 포스팅 바로가기

▶ Mono

https://devfunny.tistory.com/882

 

[Spring Reactive Programming] 6. Reactive Streams - Mono 예제 맛보기

Mono vs Flux 객체 차이 설명 Mono 0 ~ 1 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체 Flux 0 ~ N 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체 - 하

devfunny.tistory.com

 

▶ Flux

https://devfunny.tistory.com/883

 

[Spring Reactive Programming] 7. Reactive Streams - Flux 예제 맛보기

Mono vs Flux 객체 차이 설명 Mono 0 ~ 1 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체 Flux 0 ~ N 개의 데이터 전달 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체- 하

devfunny.tistory.com

 

 

RxJava

  • Netflix 사에서 개발
  • 닷넷 프레임워크를 지원하는 Reactive Extensions를 포팅 (다른 플랫폼으로 코드를 이식하는 작업)
  • Flowable, Observable, Single, Maybe, Completable, publisher 제공

 

 

RxJava - Flowable

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • backPressure 지원
  • Reactor의 Flux와 유사

 

 

Flowable 예제

@Slf4j
public class p199_FlowableExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flowable<Integer> getItems() {
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

실행결과
09:53:02.296 [main] INFO com.example06.rxjava.p199_FlowableExample - start main
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:53:02.340 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:53:02.450 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:53:02.622 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:53:02.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:53:02.933 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:53:03.112 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:53:03.114 [main] INFO com.example06.rxjava.p199_FlowableExample - end main

 

 

Flowable - backPressure 예제

@Slf4j
public class p200_FlowableContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                // 1개씩 처리 (backPressure)
                .subscribe(new p185_ContinuousRequestSubscriber<>());
        log.info("end main");
    }

    private static Flowable<Integer> getItems() {
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

ContinuousRequestSubscriber
@Slf4j
public class p185_ContinuousRequestSubscriber<T>
        implements Subscriber<T> {
    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count); // 개수만큼 요청
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        // 1개를 또 호출
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

실행결과
09:54:17.223 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - start main
09:54:17.258 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:54:17.259 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:17.260 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:54:19.408 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:19.409 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:54:20.483 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:20.484 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:54:21.544 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:21.545 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:54:22.618 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:22.623 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete
09:54:22.623 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - end main

 

 

 

RxJava - Observable

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • backPressure 지원하지 않는다.

 

 

Observable vs Flowable

Observable Flowable
Push 기반 Pull 기반
Subscriber가 처리할 수 없더라도 item을 전달 Subscriber가 request의 수를 조절
Reactive manifesto의 message driven을 일부만 준수 Reactive manifesto의 message driven을 모두 준수
onSubscribe로 Disposable 전달 onSubscribe시 Subscription 전달

 

 

 

Observable 예제 

@Slf4j
public class p203_ObservableExample {
    public static void main(String[] args) {
        // 배압 조절 불가능
        getItems()
                .subscribe(new p203_SimpleObserver());
    }

    private static Observable<Integer> getItems() {
        return Observable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

SimpleObserver
@Slf4j
public class p203_SimpleObserver implements Observer {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
        this.disposable = d;
    }

    @Override
    public void onNext(@NonNull Object o) {
        log.info("item: {}", o);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

실행결과
09:57:21.403 [main] INFO com.example06.rxjava.p203_SimpleObserver - subscribe
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 1
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 2
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 3
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 4
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 5
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - complete

 

 

RxJava - Single

  • 1개의 item을 전달 후 바로 onComplete signal 전달
  • 1개의 item이 없다면 onError signal 전달
    • 정확히 1개의 아이템을 처리해야한다.
  • 에러가 발생했다면 onError signal 전달

 

 

 

Single - success 예제

public class p205_SingleExample {
    public static void main(String[] args) {
        getItem()
                .subscribe(new p205_SimpleSingleObserver<>());
    }

    private static Single<Integer> getItem() {
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(1);
        });
    }
}

 

SimpleSingleObserver
@Slf4j
public class p205_SimpleSingleObserver<T> implements SingleObserver<T> {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        log.info("subscribe");
    }

    @Override
    public void onSuccess(@NonNull Object o) {
        log.info("item: {}", o);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }
}

 

실행결과
09:58:59.778 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:58:59.780 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - item: 1

 

 

Single - error (빈 값)

@Slf4j
public class p206_SingleNullExample {
    public static void main(String[] args) {
        getItem()
                .subscribe(new p205_SimpleSingleObserver<>());
    }

    private static Single<Integer> getItem() {
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(null); // 에러 발생시킴
        });
    }
}

 

실행결과
09:59:53.191 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:59:53.193 [main] ERROR com.example06.rxjava.p205_SimpleSingleObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.

 

 

 

RxJava - Maybe

  • 1개의 item을 전달 후 바로 onComplete signal 전달
  • 1개의 item이 없어도 onComplete signal 전달 가능
  • 에러가 발생했다면 onError signal 전달
  • Reactor의 Mono와 유사

 

 

SimpleMaybeObserver
@Slf4j
public class SimpleMaybeObserver<T> implements MaybeObserver<T> {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        log.info("subscribe");
    }

    @Override
    public void onSuccess(@NonNull T t) {
        log.info("item: {}", t);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

 

Maybe - success 예제

@Slf4j
public class p208_MaybeExample {
    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());
    }

    private static Maybe<Integer> maybeGetItem() {
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onSuccess(1);
        });
    }
}

 

실행결과
10:42:03.579 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:03.581 [main] INFO com.example06.rxjava.SimpleMaybeObserver - item: 1

 

 

Maybe - success (빈 값) 예제

@Slf4j
public class p209_MaybeEmptyValueExample {
    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());
    }

    private static Maybe<Integer> maybeGetItem() {
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onComplete(); // complete()만 호출
        });
    }
}

 

실행결과
10:42:24.592 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:24.593 [main] INFO com.example06.rxjava.SimpleMaybeObserver - complete

 

 

Maybe - error 예제

@Slf4j
public class p209_MaybeNullValueExample {
    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());
    }

    private static Maybe<Integer> maybeGetItem() {
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onSuccess(null);
        });
    }
}

 

실행결과
10:42:48.454 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:48.456 [main] ERROR com.example06.rxjava.SimpleMaybeObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.

 

 

RxJava - Completable

  • onComplete 혹은 onError signal만 전달
  • 값이 아닌 사건을 전달
    • 어떤 작업이 성공적으로 완료되었는지 또는 에러가 발생했는지만을 전달하기 때문에 따로 값을 보내지 않는다는 의미다.

 

 

SimpleCompletableObserver
@Slf4j
public class SimpleCompletableObserver implements CompletableObserver {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
        this.disposable = d;
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }
}

 

 

Completable - success 예제

@Slf4j
public class p212_CompletableExample {
    public static void main(String[] args) {
        getCompletion()
                .subscribe(new SimpleCompletableObserver());
    }

    private static Completable getCompletion() {
        return Completable.create(completableEmitter -> {
            Thread.sleep(1000);
            completableEmitter.onComplete(); // 값이 아닌 사건을 전달 
        });
    }
}

 

실행결과
10:43:45.900 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:43:46.924 [main] INFO com.example06.rxjava.SimpleCompletableObserver - complete

 

 

Completable - error 예제

@Slf4j
public class p213_CompletableErrorExample {
    public static void main(String[] args) {
        getCompletion()
                .subscribe(new SimpleCompletableObserver());
    }

    private static Completable getCompletion() {
        return Completable.create(completableEmitter -> {
            Thread.sleep(1000);
            completableEmitter.onError(
                    new RuntimeException("error in completable")
            );
        });
    }
}

 

실행결과
10:44:07.096 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:44:08.124 [main] ERROR com.example06.rxjava.SimpleCompletableObserver - error: error in completable

 

 

Publisher 총정리

  onSubscribe onNext onComplete onError
Flowable backPressure, cancelation 지원 0..n 언제든지 가능 언제든지 가능
Observable cancelation 지원 0..n 언제든지 가능 언제든지 가능
Single cancelation 지원 1 onNext 이후 바로 언제든지 가능
Maybe cancelation 지원 0..1 언제든지 가능
onNext 이후 바로
언제든지 가능
Completable cancelation 지원 0 언제든지 가능 언제든지 가능

 

 

 

Mutiny

  • Hibernate reactive에서 비동기 라이브러리로 제공
  • Multi, Uni publisher 제공
  • 리액티브 프로그래밍을 단순화하고 간소화하기 위한 목적으로 만들어졌다.

 

 

Mutiny - Multi

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달 하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • backPressure 지원
  • Reactor의 flux와 유사

 

 

Multi 예제

@Slf4j
public class p218_MultiExample {
    public static void main(String[] args) {
        getItems()
                .subscribe()
                // subscribe 동시에 넘길 수 없음, subscribe() 호출 후 아래 호출 필요
                .withSubscriber(
                        new p218_SimpleMultiSubscriber<>(Integer.MAX_VALUE)
                );
    }

    private static Multi<Integer> getItems() {
        return Multi.createFrom().items(1, 2, 3, 4, 5);
    }
}

 

SimpleMultiSubscriber
@Slf4j
@RequiredArgsConstructor
public class p218_SimpleMultiSubscriber<T> implements MultiSubscriber<T> {
    private final Integer count;

    @Override
    public void onSubscribe(Flow.Subscription s) {
        s.request(count);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("fail: {}", failure.getMessage());
    }

    @Override
    public void onCompletion() {
        log.info("completion");
    }
}

 

실행결과
10:46:36.701 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 1
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 2
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 3
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 4
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 5
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - completion
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - subscribe

 

 

Mutiny - Uni

  • 0..1개의 item을 전달
  • 에러가 발생하면 error signal 전달 하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • Reactor의 Mono와 유사

 

 

Uni 예제

@Slf4j
public class p220_UniExample {
    public static void main(String[] args) {
        getItem()
                .subscribe()
                .withSubscriber(new p220_SimpleUniSubscriber<>(Integer.MAX_VALUE));
    }

    private static Uni<Integer> getItem() {
        return Uni.createFrom().item(1);
    }
}

 

SimpleUniSubscriber
@Slf4j
@RequiredArgsConstructor
public class p220_SimpleUniSubscriber<T> implements UniSubscriber<T> {
    private final Integer count;
    private UniSubscription subscription;

    @Override
    public void onSubscribe(UniSubscription s) {
        this.subscription = s;
        s.request(1);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("error: {}", failure.getMessage());
    }
}

 

실행결과
10:47:27.202 [main] INFO com.example06.mutiny.p220_SimpleUniSubscriber - subscribe
10:47:27.208 [main] INFO com.example06.mutiny.p220_SimpleUniSubscriber - item: 1

 

 

 

 

Reference

강의; Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지

반응형

Designed by JB FACTORY