Reactive Streams
- Coding/Reactive
- 2020. 11. 6.
Reactuve Streans를 들어가기 이전
Reactive Streams 포스팅을 들어가기전, Reactive Spring의 개념에 대해 알아야한다.
- 포스팅 바로가기 : devfunny.tistory.com/314?category=820617
Reactive Streams
Reactive Streams란 라이브러리나 프레임워크에 상관없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘이다. Reactive Streams 사용을 위해 제공된 인터페이스를 사용하여 직접 구현이 가능하다.
구성
Publisher : 데이터를 통지하는 생산자
Subscriber : 데이터를 받아 처리하는 소비자
Reactive Streams는 데이터를 만들어 통지하는 Publisher(생산자)와 통지된 데이터를 받아 처리하는 Subscriber(소비자)로 구성된다. Subscriber가 Publisher를 구독(subScriber)하면 Publisher가 통지한 데이터를 Subscriber가 받을 수 있다.
Subscriber -> Publisher(생산자)를 구독(subscribe) -> Publisher(생산자)가 Subscriber(소비자)에게 데이터 통지 가능
Reactive Streams의 흐름
1) SubScriber 데이터 통지 준비 끝 -> 준비가 끝났다는 사실을 Subscriber에 통지함(onSubscribe)
2) 통지를 받은 Subscriber는 받고자 하는 데이터 개수 요청
Subscriber가 자신이 받을 데이터 개수를 요청하지 않으면 Publisher는 통지해야 할 데이터 개수 요청을 기다리게되어 통지를 시작할 수 없다. (block)
* 데이터 개수 요청 이유? Publisher가 통지하는 데이터 개수를 제어하기 위함
Publisher와 Subscriber의 처리가 각각 다른 스레드에서 진행되는데 Publisher의 통지 속도가 빠르면 Subscriber가 소화할 수 없을 만큼의 많은 데이터가 쌓이게되고, 이를 막기위해 Subscriber가 처리할 수 있을 만큼의 데이터만큼 제어하는 수단이 필요하다.
3) Publisher는 데이터를 만들어 Subscriber에 통지함 (onNext)
4) Subscriber는 받은 데이터를 사용해 처리 작업을 수행
5) Publisher는 요청받은 만큼의 데이터를 통지한 뒤 Subscriber로부터 다음 요청이 올때까지 데이터 통지를 중단
6) Subscriber가 처리 작업을 완료하면 다음에 받을 데이터 개수를 Publisher에 요청
이 요청을 보내지 않으면 Publisher는 요청 대기 상태가 됨 -> Subscriber에 데이터를 통지할 수 없다.
7) Publisher는 Subscriber에 모든 데이터를 통지하고 마지막으로 데이터 전송이 완료되어 정산 종료됨을 통지 (onComplete)
8) 완료 통지 후 Publisher는 이 구독건에 대해 어떤 통지도 하지 않고, 만약 에러가 발생한다면 에러 발생을 에러 객체와 함께 통지 (onError)
Reactive Streams의 제공 프로토콜
1) onSubscribe : 데이터 통지가 준비됐음을 통지
2) onNext : 데이터 통지
3) onError : 에러 통지
4) onComplete : 완료 통지
Reactive Streams가 제공하는 인터페이스
1) Publisher : 데이터를 생성하고 통지하는 인터페이스
2) Subscriber : 통지된 데이터를 전달받아 처리하는 인터페이스
3) Subscription : 데이터 개수를 요청하고 구독을 해지하는 인터페이스
4) Processor : Publisher와 Suscriber의 기능이 모두 있는 인터페이스
Reactive Streams 예제
위 Ractive Streams의 제공 프로토콜과 인터페이스가 어떻게 사용되고 어떻게 구현되는지 간단하게 예제로 이해해보자.
1. Publisher.java (데이터를 통지하는 생산자)
public interface Publisher<T> {
//데이터를 받는 Subscriber 등록
public void subscribe(Subscriber <? super T> subscriber);
}
2. Subscriber.java (데이터를 받아 처리하는 소비자)
public interface Subscriber<T> {
//구독 시작 처리
public void onSubscribe(Subscription subscription); // 아래 Subscription을 인자로 전달
//데이터 통지시 처리
public void onNext(T item);
//에러 통지시 처리
public void onError(Throwable error);
//완료 통지시 처리
public void onComplete();
}
3. Subscription.java (생산자와 소비자를 연결하는 인터페이스)
public interface Subscription { // 통지받을 데이터 개수를 지정해 데이터 통지를 요청하거나 통지받지 않게 구독을 해지할때 사용하는 인터페이스
//통지받을 데이터 개수 요청
public void request(long num);
//구독 해지
public void cancel(); // Subscriber에서 호출함 (Subscription을 받은 Subscriber에서 구독 해지를 위해 호출)
}
4. Processor.java (Publisher/Subscriber의 기능이 모두 있는 인터페이스)
// Publisher와 Subscriber가 사용하는 Subscription은
public abstract interface Processor<T, R> extends Subscriber<T>, Publichser<R> {}
Reactive Streams의 규칙
1) 구독 시작 통지(onSubscribe)는 해당 구독에서 한번만 발생한다.
2) 통지는 순차적으로 이루어진다.
-> 여러 통지를 동시에 할 수 없다. (데이터의 불일치를 방지)
3) null을 통지하지 않는다.
-> null을 통지하면 NullPointerException이 발생된다.
4) Publisher의 처리는 완료(omComplete) 또는 에러(onError)를 통지하여 종료한다.