[Spring Reactive Programming] 2. Reactive Streams - Operators (sum, reduce, Generic으로 구현)

반응형
728x90
반응형

Pub/Sub

1) Publisher 생성

Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
                            .limit(10)
                            .collect(Collectors.toList()));
  • iterPub()
private static Publisher<Integer> iterPub(List<Integer> iter) {
    Publisher<Integer> pub = new Publisher<Integer>() {
        // Publisher 의 구현해야하는 메서드
        @Override
        public void subscribe(Subscriber<? super Integer> sub) { // 호출하면 그때부터 데이터를 통지
            // Subscription : Publisher, Subscriber 둘 사이의 구독이 한번 일어난다는 의미
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    try {
                        // iterable 의 원소를 통지한다.
                        iter.forEach(s -> sub.onNext(s));
                        // 여기서 멈추면 안되고, publisher 가 데이터 통지가 완료했으면 완료됨을 호출해야한다.
                        sub.onComplete();
                    } catch (Throwable t) {
                        // 에러 처리
                        sub.onError(t);
                    }
                }

                /**
                 * Subscriber 에서 Subscription 객체의 cancel()을 호출할 수 있다.
                 * 더이상 데이터를 통지받지 않겠다고 알림
                 */
                @Override
                public void cancel() {

                }
            });
        }
    };
    return pub;
}

 

2) Subscriber 생성

// 구독자
Subscriber<Integer> sub = logSub();
  • logSub()
private static Subscriber<Integer> logSub() {
    Subscriber<Integer> sub = new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
            // Subscription 의 request 를 요청해야한다.
            log.debug("onSubscribe");
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Integer i) {
            log.debug("onNext:{}", i);
        }

        @Override
        public void onError(Throwable t) {
            log.debug("onError:{}", t);
        }

        @Override
        public void onComplete() {
            log.debug("onComplete");
        }
    };
    
    return sub;
}

 

3) 구독 시작

// 구독 시작
pub.subscribe(sub);

 

 

리팩토링 - DeleagateSub 파일 생성

DeleagateSub.java
package com.reactive.step02;

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Slf4j
public class DelegateSub implements Subscriber<Integer> {
    private Subscriber sub;
    
    public DelegateSub(Subscriber sub) {
        this.sub = sub;
    }

    @Override
    public void onSubscribe(Subscription s) {
        sub.onSubscribe(s);
    }

    @Override
    public void onNext(Integer i) {
        sub.onNext(i);
    }

    @Override
    public void onError(Throwable t) {
        sub.onError(t);
    }

    @Override
    public void onComplete() {
        sub.onComplete();
    }
}

 

▶ Subscriber 객체를 생성할때 재정의가 필요한 메서드만 선언한다.

pub.subscribe(new DelegateSub(sub) {
    @Override
    public void onNext(Integer i) {
        sub.onNext(f.apply(i));
    }
}); // 새로운 Subscriber

 

 

2단계 적용

pub -> data1 -> mapPub -> data2 -> logSub

1) Publisher 생성

Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10);
  • mapPub()
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
    return new Publisher<Integer>() {
        // pub -> data1 -> mapPub(Subscriber 생성) -> data2 -> logSub
        @Override
        public void subscribe(Subscriber<? super Integer> sub) { // logSub
            pub.subscribe(new DelegateSub(sub) {
                @Override
                public void onNext(Integer i) {
                    sub.onNext(f.apply(i));
                }
            }); // 새로운 Subscriber
        }
    };
}

 

2) 구독 시작

mapPub.subscribe(sub);

 

결과
21:36:49.407 [main] DEBUG com.reactive.step02.E05_PubSub - onSubscribe
21:36:49.408 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:10
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:20
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:30
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:40
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:50
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:60
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:70
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:80
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:90
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onNext:100
21:36:49.409 [main] DEBUG com.reactive.step02.E05_PubSub - onComplete

 

 

sumPub

데이터가 통지되어도 onNext()를 호출하는게 아닌, 모든 통지된 데이터의 합계를 완료했을때 마지막에 onNext()를 호출한다.
Publisher<Integer> sumPup = sumPub(pub);
  • sumPub()
private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
    return new Publisher<Integer>() {
        // 중개 역할
        @Override
        public void subscribe(Subscriber<? super Integer> sub) {
            pub.subscribe(new DelegateSub(sub) {
                int sum = 0;
                @Override
                public void onNext(Integer i) {
                    // 넘기면 안된다.
                    sum += i;

                    // 결과를 넘기는건 모든 데이터의 합계를 완료했을 때
                    // 완료를 알 수 있는 법 : onComplete()
                }

                /**
                 * Publisher가 완료의 신호를 줄때, 그때 Sub의 onNext를 호출해서 데이터를 한번 전달한다.
                 */
                @Override
                public void onComplete() {
                    sub.onNext(sum);
                    sub.onComplete();
                }
            });
        }
    };
}

1) onComplete() 안에서 onNext(), onComplete()를 한번에 호출한다.

/**
 * Publisher가 완료의 신호를 줄때, 그때 Sub의 onNext를 호출해서 데이터를 한번 전달한다.
 */
@Override
public void onComplete() {
    sub.onNext(sum);
    sub.onComplete();
}

 

 

ReducePub

덧셈, 곱셈 등의 식은 매개변수로 넘기도록 BiFunction 인터페이스를 사용한다.
sumPub()과 동일하게, 데이터가 통지되어도 onNext()를 호출하는게 아닌, 모든 통지된 데이터의 합계를 완료했을때 마지막에 onNext()를 호출한다.
Publisher<Integer> sumPup = reducePub(pub, 0, (BiFunction<Integer, Integer, Integer>) (a, b) -> a + b); // 초기데이터 , 함수 전달

 

BiFunction.java
(BiFunction<Integer, Integer, Integer>) (a, b) -> a + b
  • T : a 타입
  • U : b 타입
  • R : 최종 결과 타입 
@FunctionalInterface
public interface BiFunction<T, U, R> {
    R apply(T t, U u);

    default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
        Objects.requireNonNull(after);
        return (T t, U u) -> after.apply(apply(t, u));
    }
}
  • reducePub()
private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
    return new Publisher<Integer>() {
        @Override
        public void subscribe(Subscriber<? super Integer> sub) {

            pub.subscribe(new DelegateSub(sub) {
                int result = init;

                @Override
                public void onNext(Integer i) {
                    result = bf.apply(result, i);
                }

                @Override
                public void onComplete() {
                    sub.onNext(result);

                    sub.onComplete();
                }
            });
        }
    };
}

 

 

 

Generic으로 구현하기

어떤 타입이든 받아서 처리할 수 있도록 하자.
DelegateGenericSub.java
@Slf4j
public class E08_DelegateGenericSub<T> implements Subscriber<T> {
    private Subscriber sub;

    public E08_DelegateGenericSub(Subscriber<? super T> sub) {
        this.sub = sub;
    }

    @Override
    public void onSubscribe(Subscription s) {
        sub.onSubscribe(s);
    }

    @Override
    public void onNext(T i) {
        sub.onNext(i);
    }

    @Override
    public void onError(Throwable t) {
        sub.onError(t);
    }

    @Override
    public void onComplete() {
        sub.onComplete();
    }
}

 

1) Publisher 생성

Publisher<Integer> mapPub = mapPub(pub, (Function<Integer, Integer>) s -> s * 10);

 


2) 어떤 종류의 타입이든 받아서 처리하는 mapPub()

private static <T> Publisher<T> mapPub(Publisher<T> pub, Function<T, T> f) {
    return new Publisher<T>() {
        // pub -> data1 -> mapPub(Subscriber 생성) -> data2 -> logSub
        @Override
        public void subscribe(Subscriber<? super T> sub) { // logSub
            pub.subscribe(new E08_DelegateGenericSub<T>(sub) {
                @Override
                public void onNext(T i) {
                    sub.onNext(f.apply(i));
                }
            }); // 새로운 Subscriber
        }
    };
}

 

 

Output Generic으로 구현하기

어떤 타입이든 받아서 처리하고, 결과를 리턴하도록 하자.
DelegateGenericOutputSub.java
@Slf4j
public class E09_DelegateGenericOutputSub<T, R> implements Subscriber<T> {
    private Subscriber sub;

    public E09_DelegateGenericOutputSub(Subscriber<? super R> sub) {
        this.sub = sub;
    }

    @Override
    public void onSubscribe(Subscription s) {
        sub.onSubscribe(s);
    }

    @Override
    public void onNext(T i) {
        sub.onNext(i);
    }

    @Override
    public void onError(Throwable t) {
        sub.onError(t);
    }

    @Override
    public void onComplete() {
        sub.onComplete();
    }
}

 

1) Publisher 생성

Publisher<String > mapPub = mapPub(pub, (Function<Integer, String>) s -> "[" + s + "]");

 

Function.java
@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);

   ...
}
  • mapPub()
private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
    return new Publisher<R>() {
        // pub -> data1 -> mapPub(Subscriber 생성) -> data2 -> logSub
        @Override
        public void subscribe(Subscriber<? super R> sub) { // logSub
            pub.subscribe(new E09_DelegateGenericOutputSub<T, R>(sub) {
                @Override
                public void onNext(T i) {
                    sub.onNext(f.apply(i));
                }
            }); // 새로운 Subscriber
        }
    };
}

 

DelegateGenericOutputSub을 사용해여 reducePub을 다시 구현해보자.

덧셈, 곱셈 등의 식은 매개변수로 넘기도록 BiFunction 인터페이스를 사용한다.
sumPub()과 동일하게, 데이터가 통지되어도 onNext()를 호출하는게 아닌, 모든 통지된 데이터의 합계를 완료했을때 마지막에 onNext()를 호출한다.
Publisher<String> sumPup = reducePub(pub, "", (a, b) -> a + b); // 초기데이터 , 함수 전달
  • reducePub()
// 1, 2, 3, 4, 5
// 0 -> (0, 1) -> 0 + 1 = 1
// 1 -> (1, 2) -> 1 + 2 = 3
// 2 -> (3, 3) -> 3 + 3 = 6
// ...
private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
    return new Publisher<R>() {
        @Override
        public void subscribe(Subscriber<? super R> sub) {

            pub.subscribe(new E09_DelegateGenericOutputSub<T, R>(sub) {
                R result = init;

                @Override
                public void onNext(T i) {
                    result = bf.apply(result, i);
                }

                @Override
                public void onComplete() {
                    sub.onNext(result);

                    sub.onComplete();
                }
            });
        }
    };
}

 

결과
22:13:21.873 [main] DEBUG com.reactive.step02.E10_GenericReducePub - onSubscribe
22:13:21.895 [main] DEBUG com.reactive.step02.E10_GenericReducePub - onNext:12345678910
22:13:21.896 [main] DEBUG com.reactive.step02.E10_GenericReducePub - onComplete

 

 

Reactor 맛보기

build.gradle 추가
implementation 'io.projectreactor:reactor-core:3.4.23'

 

ReactorEx.java
public class ReactorEx {
    public static void main(String[] args) {
        Flux.<Integer>create(s -> {
            s.next(1);
            s.next(2);
            s.next(3);
            s.complete();
        })
        .log() // 오가는 데이터의 로그를 출력
        .map(s -> s * 10)
        .reduce(0, (a, b) -> a + b)
        .log()
        .subscribe(System.out::println); // onNext()
    }
}

 

결과
22:12:41.828 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
22:12:41.829 [main] INFO reactor.Mono.ReduceSeed.2 - | onSubscribe([Fuseable] MonoReduceSeed.ReduceSeedSubscriber)
22:12:41.829 [main] INFO reactor.Mono.ReduceSeed.2 - | request(unbounded)
22:12:41.829 [main] INFO reactor.Flux.Create.1 - request(unbounded)
22:12:41.830 [main] INFO reactor.Flux.Create.1 - onNext(1)
22:12:41.831 [main] INFO reactor.Flux.Create.1 - onNext(2)
22:12:41.831 [main] INFO reactor.Flux.Create.1 - onNext(3)
22:12:41.831 [main] INFO reactor.Flux.Create.1 - onComplete()
22:12:41.831 [main] INFO reactor.Mono.ReduceSeed.2 - | onNext(60)
60
22:12:41.831 [main] INFO reactor.Mono.ReduceSeed.2 - | onComplete()

 

 

 

참고 : 토비의 봄 TV

반응형

Designed by JB FACTORY