반응형
728x90
반응형
Reactive streams 구현 라이브러리
ReactiveStreams의 구현 라이브러리를 하나씩 알아보자.
- Project reactor
- 리액티브 스트림 스펙의 구현체 중 하나로, 리액티브 프로그래밍 라이브러리
- RxJava
- 리액티브 프로그래밍을 위한 라이브러리
- Mutiny
- 리액티브 프로그래밍을 위한 작은 크기의 라이브러리
Project reactor
- Pivotal 사에서 개발
- Spring reactor에서 사용
- Java 및 Spring에서 사용되는 리액티브 프로그래밍 라이브러리
- Mono와 Flux publisher를 통해 비동기적으로 데이터를 다룰 수 있다.
Project reactor - Flux
- 0..n개의 item을 전달
- 에러가 발생하면 error signal 전달하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- backPressure 지원
Flux 예제
SimpleSubscriber
@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
private final Integer count;
/**
* 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
* @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count); // count만큼 request
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(100);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
FluxSimpleExample
@Slf4j
public class p181_FluxSimpleExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
// main 쓰레드에서 수행
getItems() // 고정된 개수를 subscribe
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
5개의 item이 전달된 결과
13:18:58.672 [main] INFO com.example06.reactor.p181_FluxSimpleExample - start main
13:18:58.733 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:18:58.737 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:18:58.904 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:18:59.049 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:18:59.233 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:18:59.399 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:18:59.578 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
13:18:59.578 [main] INFO com.example06.reactor.p181_FluxSimpleExample - end main
Flux - subscribeOn 예제
FluxSimpleSubscribeOnExample
@Slf4j
public class p182_FluxSimpleSubscribeOnExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.map(i -> {
log.info("map {}", i);
return i;
})
// main 쓰레드가 아닌 다른 쓰레드에서 수행
.subscribeOn(Schedulers.single())
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main"); // 바로 호출
Thread.sleep(1000);
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
main 쓰레드가 아닌 single-1 쓰레드로 수행됨을 알 수 있다.
13:22:13.042 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - start main
13:22:13.094 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:22:13.122 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - end main
13:22:13.124 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 1
13:22:13.124 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:22:13.264 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 2
13:22:13.264 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:22:13.440 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 3
13:22:13.441 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:22:13.613 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 4
13:22:13.614 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:22:13.789 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 5
13:22:13.789 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:22:13.969 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - complete
▶ 별도 쓰레드로 수행시키는 작업과 관련해서 아래 포스팅을 참고하자.
https://devfunny.tistory.com/878
Flux - subscribe
FluxNoSubscribeExample
subscribe하지 않으면, 아무 일도 일어나지 않는다.
@Slf4j
public class p183_FluxNoSubscribeExample {
public static void main(String[] args) {
log.info("start main");
getItems(); // subscribe 하지않으면 아무일도 일어나지 않는다.
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
log.info("start getItems");
for (int i = 0; i < 5; i++) {
fluxSink.next(i);
}
fluxSink.complete();
log.info("end getItems");
});
}
}
Flux - backPressure
1) 1번째 예제
@Slf4j
public class p184_FluxSimpleRequestThreeExample {
public static void main(String[] args) {
// 3개 요청 (1, 2, 3 이후 종료) , 추가적인 요청 없음
getItems().subscribe(new p181_SimpleSubscriber<>(3));
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
SimpleSubscriber
@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
private final Integer count;
/**
* 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
* @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count); // count만큼 request
log.info("request: {}", count);
}
...
}
실행결과
3개 요청 이후, 나머지는 수행되지 않는다.
09:24:32.953 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 3
09:24:32.958 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:24:33.091 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:24:33.232 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
2) 2번째 예제
@Slf4j
public class p185_FluxContinuousRequestSubscriberExample {
public static void main(String[] args) {
getItems().subscribe(new p185_ContinuousRequestSubscriber<>());
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
ContinuousRequestSubscriber
@Slf4j
public class p185_ContinuousRequestSubscriber<T>
implements Subscriber<T> {
private final Integer count = 1;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
log.info("subscribe");
s.request(count); // 개수만큼 요청
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(1000);
// 1개를 또 호출
subscription.request(1);
log.info("request: {}", count);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
실행결과
1개씩 request() 하기 때문에 아래와 같이 각 아이템별로 request(), onNext()가 호출된다.
09:33:34.419 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:34.425 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:33:39.718 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:39.727 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete
Flux - error
@Slf4j
public class p186_FluxErrorExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.next(0);
fluxSink.next(1);
var error = new RuntimeException("error in flux");
fluxSink.error(error); // 에러 전달
});
}
}
실행결과
09:36:46.692 [main] INFO com.example06.reactor.p186_FluxErrorExample - start main
09:36:46.756 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:36:46.764 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 0
09:36:46.885 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:36:47.028 [main] ERROR com.example06.reactor.p181_SimpleSubscriber - error: error in flux
09:36:47.028 [main] INFO com.example06.reactor.p186_FluxErrorExample - end main
Flux - complete
@Slf4j
public class p187_FluxCompleteExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.complete(); // complete 전달
});
}
}
실행결과
09:37:31.038 [main] INFO com.example06.reactor.p187_FluxCompleteExample - start main
09:37:31.100 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:37:31.109 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:37:31.109 [main] INFO com.example06.reactor.p187_FluxCompleteExample - end main
Project reactor - Mono
- 0..1개의 item을 전달
- 에러가 발생하면 error signal 전달하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
Mono 예제
- 1개의 item만 전달하기 때문에 next 하나만 실행하면 complete가 보장
- 혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미 - 하나의 값이 있거나 없다
@Slf4j
public class p190_MonoSimpleExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Mono<Integer> getItems() {
return Mono.create(monoSink -> {
monoSink.success(1);
});
}
}
실행결과
09:39:02.445 [main] INFO com.example06.reactor.p190_MonoSimpleExample - start main
09:39:02.481 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:39:02.658 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:39:02.658 [main] INFO com.example06.reactor.p190_MonoSimpleExample - end main
Mono와 Flux
Mono<T> : Optional<T>
- 없거나 혹은 하나의 값
- Mono로 특정 사건이 완료되는 시점을 가리킬 수도 있다
Flux<T>: List<T>
- 무한하거나 유한한 여러 개의 값
Flux를 Mono로 변환
- Mono.from으로 Flux를 Mono로 변환
- 첫 번째 값만 전달
@Slf4j
public class p192_FluxToMonoExample {
public static void main(String[] args) {
log.info("start main");
// 1,2,3,4,5 중 첫번째값 1이 onNext로 전달되고 complete
// 뒤에 있는 값들은 모두 무시
Mono.from(getItems())
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
Mono는 1개만 전달되므로, 원소 1은 수행되고 그 이후 원소들은 무시됨을 확인할 수 있다.
09:40:37.275 [main] INFO com.example06.reactor.p192_FluxToMonoExample - start main
09:40:37.340 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:40:37.365 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:40:37.473 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:40:37.473 [main] INFO com.example06.reactor.p192_FluxToMonoExample - end main
Flux를 Mono로 변환 (Mono<List<T>>)
@Slf4j
public class p193_FluxToListMonoExample {
public static void main(String[] args) {
log.info("start main");
getItems()
// collect 하고 complete 이벤트 발생 시점에 모은 값들을 모두 전달
// 1, 2, 3, 4, 5를 내부 배열에 저장하고, 가지고있던 값들을 모두 onNext() 한다.
// 하나로 합쳐져서 Mono로 한번 요청됨 ([1,2,3,4,5]
.collectList()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:41:24.680 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - start main
09:41:24.743 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:41:24.767 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:41:24.940 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:41:24.940 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - end main
Mono를 Flux로 변환
- flux()
- Mono를 next 한 번 호출하고 onComplete를 호출하는 Flux로 변환
@Slf4j
public class p194_MonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
// flux() - Mono를 next 한번 호출하고 onComplete를 호출하는 Flux로 변환
// [1,2,3,4,5]
getItems().flux()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer>> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:42:16.606 [main] INFO com.example06.reactor.p194_MonoToFluxExample - start main
09:42:16.650 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:42:16.694 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:42:16.802 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:42:16.802 [main] INFO com.example06.reactor.p194_MonoToFluxExample - end main
Mono를 Flux로 변환 (Mono<List<T>> -> Flux<T>)
@Slf4j
public class p195_ListMonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
getItems()
// Mono의 결과를 Flux 형태로 바꾸고, flux를 받아서 처리
// 1, 2, 3, 4, 5 하나씩 처리
.flatMapMany(value -> Flux.fromIterable(value))
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer>> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:43:07.895 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - start main
09:43:07.931 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:43:08.082 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:43:08.239 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:43:08.414 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:43:08.588 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:43:08.724 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:43:08.724 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - end main
Flux와 Mono 예제 이전 포스팅 바로가기
▶ Mono
https://devfunny.tistory.com/882
▶ Flux
https://devfunny.tistory.com/883
RxJava
- Netflix 사에서 개발
- 닷넷 프레임워크를 지원하는 Reactive Extensions를 포팅 (다른 플랫폼으로 코드를 이식하는 작업)
- Flowable, Observable, Single, Maybe, Completable, publisher 제공
RxJava - Flowable
- 0..n개의 item을 전달
- 에러가 발생하면 error signal 전달하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- backPressure 지원
- Reactor의 Flux와 유사
Flowable 예제
@Slf4j
public class p199_FlowableExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flowable<Integer> getItems() {
return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:53:02.296 [main] INFO com.example06.rxjava.p199_FlowableExample - start main
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:53:02.340 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:53:02.450 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:53:02.622 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:53:02.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:53:02.933 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:53:03.112 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:53:03.114 [main] INFO com.example06.rxjava.p199_FlowableExample - end main
Flowable - backPressure 예제
@Slf4j
public class p200_FlowableContinuousRequestSubscriberExample {
public static void main(String[] args) {
log.info("start main");
getItems()
// 1개씩 처리 (backPressure)
.subscribe(new p185_ContinuousRequestSubscriber<>());
log.info("end main");
}
private static Flowable<Integer> getItems() {
return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
ContinuousRequestSubscriber
@Slf4j
public class p185_ContinuousRequestSubscriber<T>
implements Subscriber<T> {
private final Integer count = 1;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
log.info("subscribe");
s.request(count); // 개수만큼 요청
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(1000);
// 1개를 또 호출
subscription.request(1);
log.info("request: {}", count);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
실행결과
09:54:17.223 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - start main
09:54:17.258 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:54:17.259 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:17.260 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:54:19.408 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:19.409 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:54:20.483 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:20.484 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:54:21.544 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:21.545 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:54:22.618 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:22.623 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete
09:54:22.623 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - end main
RxJava - Observable
- 0..n개의 item을 전달
- 에러가 발생하면 error signal 전달하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- backPressure 지원하지 않는다.
Observable vs Flowable
Observable | Flowable |
Push 기반 | Pull 기반 |
Subscriber가 처리할 수 없더라도 item을 전달 | Subscriber가 request의 수를 조절 |
Reactive manifesto의 message driven을 일부만 준수 | Reactive manifesto의 message driven을 모두 준수 |
onSubscribe로 Disposable 전달 | onSubscribe시 Subscription 전달 |
Observable 예제
@Slf4j
public class p203_ObservableExample {
public static void main(String[] args) {
// 배압 조절 불가능
getItems()
.subscribe(new p203_SimpleObserver());
}
private static Observable<Integer> getItems() {
return Observable.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
SimpleObserver
@Slf4j
public class p203_SimpleObserver implements Observer {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
log.info("subscribe");
this.disposable = d;
}
@Override
public void onNext(@NonNull Object o) {
log.info("item: {}", o);
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
실행결과
09:57:21.403 [main] INFO com.example06.rxjava.p203_SimpleObserver - subscribe
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 1
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 2
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 3
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 4
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 5
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - complete
RxJava - Single
- 1개의 item을 전달 후 바로 onComplete signal 전달
- 1개의 item이 없다면 onError signal 전달
- 정확히 1개의 아이템을 처리해야한다.
- 에러가 발생했다면 onError signal 전달
Single - success 예제
public class p205_SingleExample {
public static void main(String[] args) {
getItem()
.subscribe(new p205_SimpleSingleObserver<>());
}
private static Single<Integer> getItem() {
return Single.create(singleEmitter -> {
singleEmitter.onSuccess(1);
});
}
}
SimpleSingleObserver
@Slf4j
public class p205_SimpleSingleObserver<T> implements SingleObserver<T> {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.disposable = d;
log.info("subscribe");
}
@Override
public void onSuccess(@NonNull Object o) {
log.info("item: {}", o);
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
}
실행결과
09:58:59.778 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:58:59.780 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - item: 1
Single - error (빈 값)
@Slf4j
public class p206_SingleNullExample {
public static void main(String[] args) {
getItem()
.subscribe(new p205_SimpleSingleObserver<>());
}
private static Single<Integer> getItem() {
return Single.create(singleEmitter -> {
singleEmitter.onSuccess(null); // 에러 발생시킴
});
}
}
실행결과
09:59:53.191 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:59:53.193 [main] ERROR com.example06.rxjava.p205_SimpleSingleObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.
RxJava - Maybe
- 1개의 item을 전달 후 바로 onComplete signal 전달
- 1개의 item이 없어도 onComplete signal 전달 가능
- 에러가 발생했다면 onError signal 전달
- Reactor의 Mono와 유사
SimpleMaybeObserver
@Slf4j
public class SimpleMaybeObserver<T> implements MaybeObserver<T> {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.disposable = d;
log.info("subscribe");
}
@Override
public void onSuccess(@NonNull T t) {
log.info("item: {}", t);
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
Maybe - success 예제
@Slf4j
public class p208_MaybeExample {
public static void main(String[] args) {
maybeGetItem()
.subscribe(new SimpleMaybeObserver<>());
}
private static Maybe<Integer> maybeGetItem() {
return Maybe.create(maybeEmitter -> {
maybeEmitter.onSuccess(1);
});
}
}
실행결과
10:42:03.579 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:03.581 [main] INFO com.example06.rxjava.SimpleMaybeObserver - item: 1
Maybe - success (빈 값) 예제
@Slf4j
public class p209_MaybeEmptyValueExample {
public static void main(String[] args) {
maybeGetItem()
.subscribe(new SimpleMaybeObserver<>());
}
private static Maybe<Integer> maybeGetItem() {
return Maybe.create(maybeEmitter -> {
maybeEmitter.onComplete(); // complete()만 호출
});
}
}
실행결과
10:42:24.592 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:24.593 [main] INFO com.example06.rxjava.SimpleMaybeObserver - complete
Maybe - error 예제
@Slf4j
public class p209_MaybeNullValueExample {
public static void main(String[] args) {
maybeGetItem()
.subscribe(new SimpleMaybeObserver<>());
}
private static Maybe<Integer> maybeGetItem() {
return Maybe.create(maybeEmitter -> {
maybeEmitter.onSuccess(null);
});
}
}
실행결과
10:42:48.454 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:48.456 [main] ERROR com.example06.rxjava.SimpleMaybeObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.
RxJava - Completable
- onComplete 혹은 onError signal만 전달
- 값이 아닌 사건을 전달
- 어떤 작업이 성공적으로 완료되었는지 또는 에러가 발생했는지만을 전달하기 때문에 따로 값을 보내지 않는다는 의미다.
SimpleCompletableObserver
@Slf4j
public class SimpleCompletableObserver implements CompletableObserver {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
log.info("subscribe");
this.disposable = d;
}
@Override
public void onComplete() {
log.info("complete");
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
}
Completable - success 예제
@Slf4j
public class p212_CompletableExample {
public static void main(String[] args) {
getCompletion()
.subscribe(new SimpleCompletableObserver());
}
private static Completable getCompletion() {
return Completable.create(completableEmitter -> {
Thread.sleep(1000);
completableEmitter.onComplete(); // 값이 아닌 사건을 전달
});
}
}
실행결과
10:43:45.900 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:43:46.924 [main] INFO com.example06.rxjava.SimpleCompletableObserver - complete
Completable - error 예제
@Slf4j
public class p213_CompletableErrorExample {
public static void main(String[] args) {
getCompletion()
.subscribe(new SimpleCompletableObserver());
}
private static Completable getCompletion() {
return Completable.create(completableEmitter -> {
Thread.sleep(1000);
completableEmitter.onError(
new RuntimeException("error in completable")
);
});
}
}
실행결과
10:44:07.096 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:44:08.124 [main] ERROR com.example06.rxjava.SimpleCompletableObserver - error: error in completable
Publisher 총정리
onSubscribe | onNext | onComplete | onError | |
Flowable | backPressure, cancelation 지원 | 0..n | 언제든지 가능 | 언제든지 가능 |
Observable | cancelation 지원 | 0..n | 언제든지 가능 | 언제든지 가능 |
Single | cancelation 지원 | 1 | onNext 이후 바로 | 언제든지 가능 |
Maybe | cancelation 지원 | 0..1 | 언제든지 가능 onNext 이후 바로 |
언제든지 가능 |
Completable | cancelation 지원 | 0 | 언제든지 가능 | 언제든지 가능 |
Mutiny
- Hibernate reactive에서 비동기 라이브러리로 제공
- Multi, Uni publisher 제공
- 리액티브 프로그래밍을 단순화하고 간소화하기 위한 목적으로 만들어졌다.
Mutiny - Multi
- 0..n개의 item을 전달
- 에러가 발생하면 error signal 전달 하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- backPressure 지원
- Reactor의 flux와 유사
Multi 예제
@Slf4j
public class p218_MultiExample {
public static void main(String[] args) {
getItems()
.subscribe()
// subscribe 동시에 넘길 수 없음, subscribe() 호출 후 아래 호출 필요
.withSubscriber(
new p218_SimpleMultiSubscriber<>(Integer.MAX_VALUE)
);
}
private static Multi<Integer> getItems() {
return Multi.createFrom().items(1, 2, 3, 4, 5);
}
}
SimpleMultiSubscriber
@Slf4j
@RequiredArgsConstructor
public class p218_SimpleMultiSubscriber<T> implements MultiSubscriber<T> {
private final Integer count;
@Override
public void onSubscribe(Flow.Subscription s) {
s.request(count);
log.info("subscribe");
}
@Override
public void onItem(T item) {
log.info("item: {}", item);
}
@Override
public void onFailure(Throwable failure) {
log.error("fail: {}", failure.getMessage());
}
@Override
public void onCompletion() {
log.info("completion");
}
}
실행결과
10:46:36.701 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 1
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 2
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 3
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 4
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 5
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - completion
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - subscribe
Mutiny - Uni
- 0..1개의 item을 전달
- 에러가 발생하면 error signal 전달 하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- Reactor의 Mono와 유사
Uni 예제
@Slf4j
public class p220_UniExample {
public static void main(String[] args) {
getItem()
.subscribe()
.withSubscriber(new p220_SimpleUniSubscriber<>(Integer.MAX_VALUE));
}
private static Uni<Integer> getItem() {
return Uni.createFrom().item(1);
}
}
SimpleUniSubscriber
@Slf4j
@RequiredArgsConstructor
public class p220_SimpleUniSubscriber<T> implements UniSubscriber<T> {
private final Integer count;
private UniSubscription subscription;
@Override
public void onSubscribe(UniSubscription s) {
this.subscription = s;
s.request(1);
log.info("subscribe");
}
@Override
public void onItem(T item) {
log.info("item: {}", item);
}
@Override
public void onFailure(Throwable failure) {
log.error("error: {}", failure.getMessage());
}
}
실행결과
10:47:27.202 [main] INFO com.example06.mutiny.p220_SimpleUniSubscriber - subscribe
10:47:27.208 [main] INFO com.example06.mutiny.p220_SimpleUniSubscriber - item: 1
Reference
강의; Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
반응형
'Coding > Reactive' 카테고리의 다른 글
[리액티브 프로그래밍] 리액티브 스트림즈(Reactive Streams) 핵심 컴포넌트의 동작과정 (0) | 2023.06.11 |
---|---|
[리액티브 프로그래밍] Spring WebFlux의 요청 처리 흐름 (0) | 2023.05.02 |
[리액티브 프로그래밍] Reactor의 debugging (0) | 2023.04.30 |
[리액티브 프로그래밍] Reactor의 Context (0) | 2023.04.29 |
[리액티브 프로그래밍] Reactor의 Scheduler (0) | 2023.04.28 |