반응형
728x90
반응형
Pub/Sub
1) Publisher 생성
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(10)
.collect(Collectors.toList()));
- iterPub()
private static Publisher<Integer> iterPub(List<Integer> iter) {
Publisher<Integer> pub = new Publisher<Integer>() {
// Publisher 의 구현해야하는 메서드
@Override
public void subscribe(Subscriber<? super Integer> sub) { // 호출하면 그때부터 데이터를 통지
// Subscription : Publisher, Subscriber 둘 사이의 구독이 한번 일어난다는 의미
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
// iterable 의 원소를 통지한다.
iter.forEach(s -> sub.onNext(s));
// 여기서 멈추면 안되고, publisher 가 데이터 통지가 완료했으면 완료됨을 호출해야한다.
sub.onComplete();
} catch (Throwable t) {
// 에러 처리
sub.onError(t);
}
}
/**
* Subscriber 에서 Subscription 객체의 cancel()을 호출할 수 있다.
* 더이상 데이터를 통지받지 않겠다고 알림
*/
@Override
public void cancel() {
}
});
}
};
return pub;
}
2) Subscriber 생성
// 구독자
Subscriber<Integer> sub = logSub();
- logSub()
private static Subscriber<Integer> logSub() {
Subscriber<Integer> sub = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// Subscription 의 request 를 요청해야한다.
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer i) {
log.debug("onNext:{}", i);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
return sub;
}
3) 구독 시작
// 구독 시작
pub.subscribe(sub);
리팩토링 - DeleagateSub 파일 생성
DeleagateSub.java
package com.reactive.step02;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@Slf4j
public class DelegateSub implements Subscriber<Integer> {
private Subscriber sub;
public DelegateSub(Subscriber sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer i) {
sub.onNext(i);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
▶ Subscriber 객체를 생성할때 재정의가 필요한 메서드만 선언한다.
pub.subscribe(new DelegateSub(sub) {
@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}
}); // 새로운 Subscriber
2단계 적용
pub -> data1 -> mapPub -> data2 -> logSub
1) Publisher 생성
Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10);
- mapPub()
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
// pub -> data1 -> mapPub(Subscriber 생성) -> data2 -> logSub
@Override
public void subscribe(Subscriber<? super Integer> sub) { // logSub
pub.subscribe(new DelegateSub(sub) {
@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}
}); // 새로운 Subscriber
}
};
}
2) 구독 시작
mapPub.subscribe(sub);
결과
21:36:49.407 [main] DEBUG com.reactive.step02.E05_PubSub - onSubscribe
21:36:49.408 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:10
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:20
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:30
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:40
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:50
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:60
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:70
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:80
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:90
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:100
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onComplete
sumPub
데이터가 통지되어도 onNext()를 호출하는게 아닌, 모든 통지된 데이터의 합계를 완료했을때 마지막에 onNext()를 호출한다.
Publisher<Integer> sumPup = sumPub(pub);
- sumPub()
private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
return new Publisher<Integer>() {
// 중개 역할
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
int sum = 0;
@Override
public void onNext(Integer i) {
// 넘기면 안된다.
sum += i;
// 결과를 넘기는건 모든 데이터의 합계를 완료했을 때
// 완료를 알 수 있는 법 : onComplete()
}
/**
* Publisher가 완료의 신호를 줄때, 그때 Sub의 onNext를 호출해서 데이터를 한번 전달한다.
*/
@Override
public void onComplete() {
sub.onNext(sum);
sub.onComplete();
}
});
}
};
}
1) onComplete() 안에서 onNext(), onComplete()를 한번에 호출한다.
/**
* Publisher가 완료의 신호를 줄때, 그때 Sub의 onNext를 호출해서 데이터를 한번 전달한다.
*/
@Override
public void onComplete() {
sub.onNext(sum);
sub.onComplete();
}
ReducePub
덧셈, 곱셈 등의 식은 매개변수로 넘기도록 BiFunction 인터페이스를 사용한다.
sumPub()과 동일하게, 데이터가 통지되어도 onNext()를 호출하는게 아닌, 모든 통지된 데이터의 합계를 완료했을때 마지막에 onNext()를 호출한다.
Publisher<Integer> sumPup = reducePub(pub, 0, (BiFunction<Integer, Integer, Integer>) (a, b) -> a + b); // 초기데이터 , 함수 전달
BiFunction.java
(BiFunction<Integer, Integer, Integer>) (a, b) -> a + b
- T : a 타입
- U : b 타입
- R : 최종 결과 타입
@FunctionalInterface
public interface BiFunction<T, U, R> {
R apply(T t, U u);
default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t, U u) -> after.apply(apply(t, u));
}
}
- reducePub()
private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
int result = init;
@Override
public void onNext(Integer i) {
result = bf.apply(result, i);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
}
};
}
Generic으로 구현하기
어떤 타입이든 받아서 처리할 수 있도록 하자.
DelegateGenericSub.java
@Slf4j
public class E08_DelegateGenericSub<T> implements Subscriber<T> {
private Subscriber sub;
public E08_DelegateGenericSub(Subscriber<? super T> sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(T i) {
sub.onNext(i);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
1) Publisher 생성
Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10);
2) 어떤 종류의 타입이든 받아서 처리하는 mapPub()
private static <T> Publisher<T> mapPub(Publisher<T> pub, Function<T, T> f) {
return new Publisher<T>() {
// pub -> data1 -> mapPub(Subscriber 생성) -> data2 -> logSub
@Override
public void subscribe(Subscriber<? super T> sub) { // logSub
pub.subscribe(new E08_DelegateGenericSub<T>(sub) {
@Override
public void onNext(T i) {
sub.onNext(f.apply(i));
}
}); // 새로운 Subscriber
}
};
}
Output Generic으로 구현하기
어떤 타입이든 받아서 처리하고, 결과를 리턴하도록 하자.
DelegateGenericOutputSub.java
@Slf4j
public class E09_DelegateGenericOutputSub<T, R> implements Subscriber<T> {
private Subscriber sub;
public E09_DelegateGenericOutputSub(Subscriber<? super R> sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(T i) {
sub.onNext(i);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
1) Publisher 생성
Publisher<String > mapPub = mapPub(pub, (Function<Integer, String>) s -> "[" + s + "]");
Function.java
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
...
}
- mapPub()
private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
return new Publisher<R>() {
// pub -> data1 -> mapPub(Subscriber 생성) -> data2 -> logSub
@Override
public void subscribe(Subscriber<? super R> sub) { // logSub
pub.subscribe(new E09_DelegateGenericOutputSub<T, R>(sub) {
@Override
public void onNext(T i) {
sub.onNext(f.apply(i));
}
}); // 새로운 Subscriber
}
};
}
DelegateGenericOutputSub을 사용해여 reducePub을 다시 구현해보자.
덧셈, 곱셈 등의 식은 매개변수로 넘기도록 BiFunction 인터페이스를 사용한다.
sumPub()과 동일하게, 데이터가 통지되어도 onNext()를 호출하는게 아닌, 모든 통지된 데이터의 합계를 완료했을때 마지막에 onNext()를 호출한다.
Publisher<String> sumPup = reducePub(pub, "", (a, b) -> a + b); // 초기데이터 , 함수 전달
- reducePub()
// 1, 2, 3, 4, 5
// 0 -> (0, 1) -> 0 + 1 = 1
// 1 -> (1, 2) -> 1 + 2 = 3
// 2 -> (3, 3) -> 3 + 3 = 6
// ...
private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new E09_DelegateGenericOutputSub<T, R>(sub) {
R result = init;
@Override
public void onNext(T i) {
result = bf.apply(result, i);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
}
};
}
결과
22:13:21.873 [main] DEBUG com.reactive.step02.E10_GenericReducePub - onSubscribe
22:13:21.895 [main] DEBUG com.reactive.step02.E10_GenericReducePub - onNext:12345678910
22:13:21.896 [main] DEBUG com.reactive.step02.E10_GenericReducePub - onComplete
Reactor 맛보기
build.gradle 추가
implementation 'io.projectreactor:reactor-core:3.4.23'
ReactorEx.java
public class ReactorEx {
public static void main(String[] args) {
Flux.<Integer>create(s -> {
s.next(1);
s.next(2);
s.next(3);
s.complete();
})
.log() // 오가는 데이터의 로그를 출력
.map(s -> s * 10)
.reduce(0, (a, b) -> a + b)
.log()
.subscribe(System.out::println); // onNext()
}
}
결과
22:12:41.828 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
22:12:41.829 [main] INFO reactor.Mono.ReduceSeed.2 - | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber)
22:12:41.829 [main] INFO reactor.Mono.ReduceSeed.2 - | request(unbounded)
22:12:41.829 [main] INFO reactor.Flux.Create.1 - request(unbounded)
22:12:41.830 [main] INFO reactor.Flux.Create.1 - onNext(1)
22:12:41.831 [main] INFO reactor.Flux.Create.1 - onNext(2)
22:12:41.831 [main] INFO reactor.Flux.Create.1 - onNext(3)
22:12:41.831 [main] INFO reactor.Flux.Create.1 - onComplete()
22:12:41.831 [main] INFO reactor.Mono.ReduceSeed.2 - | onNext(60)
60
22:12:41.831 [main] INFO reactor.Mono.ReduceSeed.2 - | onComplete()
참고 : 토비의 봄 TV
반응형