Reactive Streams

반응형
728x90
반응형

Reactuve Streans를 들어가기 이전

Reactive Streams 포스팅을 들어가기전, Reactive Spring의 개념에 대해 알아야한다.

 

Reactive Spring / Reactive Programming

Reactive 의 예시 Reactive의 뜻은 반응형이다. 리액티브를 이해하기 위해서, 하나의 상황을 예로 들어보자. 한 어플리케이션에서 시간당 평균 약 1,000명의 사용자가 방문한다고 가정해보자. 톰캣을

devfunny.tistory.com

 

 

Reactive Streams

Reactive Streams란 라이브러리나 프레임워크에 상관없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘이다. Reactive Streams 사용을 위해 제공된 인터페이스를 사용하여 직접 구현이 가능하다.

구성
Publisher : 데이터를 통지하는 생산자
Subscriber : 데이터를 받아 처리하는 소비자

 

Reactive Streams는 데이터를 만들어 통지하는 Publisher(생산자)와 통지된 데이터를 받아 처리하는 Subscriber(소비자)로 구성된다. Subscriber가 Publisher를 구독(subScriber)하면 Publisher가 통지한 데이터를 Subscriber가 받을 수 있다.

 

Subscriber -> Publisher(생산자)를 구독(subscribe) -> Publisher(생산자)가 Subscriber(소비자)에게 데이터 통지 가능

 

 

 

Reactive Streams의 흐름

1) SubScriber 데이터 통지 준비 끝 -> 준비가 끝났다는 사실을 Subscriber에 통지함(onSubscribe)

2) 통지를 받은 Subscriber는 받고자 하는 데이터 개수 요청

Subscriber가 자신이 받을 데이터 개수를 요청하지 않으면 Publisher는 통지해야 할 데이터 개수 요청을 기다리게되어 통지를 시작할 수 없다. (block)

* 데이터 개수 요청 이유? Publisher가 통지하는 데이터 개수를 제어하기 위함

Publisher와 Subscriber의 처리가 각각 다른 스레드에서 진행되는데 Publisher의 통지 속도가 빠르면 Subscriber가 소화할 수 없을 만큼의 많은 데이터가 쌓이게되고, 이를 막기위해 Subscriber가 처리할 수 있을 만큼의 데이터만큼 제어하는 수단이 필요하다.

 

3) Publisher는 데이터를 만들어 Subscriber에 통지함 (onNext)

 

4) Subscriber는 받은 데이터를 사용해 처리 작업을 수행

 

5) Publisher는 요청받은 만큼의 데이터를 통지한 뒤 Subscriber로부터 다음 요청이 올때까지 데이터 통지를 중단

 

6) Subscriber가 처리 작업을 완료하면 다음에 받을 데이터 개수를 Publisher에 요청
이 요청을 보내지 않으면 Publisher는 요청 대기 상태가 됨 -> Subscriber에 데이터를 통지할 수 없다.

 

7) Publisher는 Subscriber에 모든 데이터를 통지하고 마지막으로 데이터 전송이 완료되어 정산 종료됨을 통지 (onComplete)

 

8) 완료 통지 후 Publisher는 이 구독건에 대해 어떤 통지도 하지 않고, 만약 에러가 발생한다면 에러 발생을 에러 객체와 함께 통지 (onError)

 

 

 

Reactive Streams의 제공 프로토콜

1) onSubscribe : 데이터 통지가 준비됐음을 통지
2) onNext : 데이터 통지
3) onError : 에러 통지
4) onComplete : 완료 통지

 

 

 

Reactive Streams가 제공하는 인터페이스

1) Publisher : 데이터를 생성하고 통지하는 인터페이스
2) Subscriber : 통지된 데이터를 전달받아 처리하는 인터페이스
3) Subscription : 데이터 개수를 요청하고 구독을 해지하는 인터페이스
4) Processor : Publisher와 Suscriber의 기능이 모두 있는 인터페이스

 

 

 

Reactive Streams 예제

위 Ractive Streams의 제공 프로토콜과 인터페이스가 어떻게 사용되고 어떻게 구현되는지 간단하게 예제로 이해해보자.

 

1. Publisher.java (데이터를 통지하는 생산자)

public interface Publisher<T> {
    //데이터를 받는 Subscriber 등록
    public void subscribe(Subscriber <? super T> subscriber);
}

 

2. Subscriber.java (데이터를 받아 처리하는 소비자)

public interface Subscriber<T> {
    //구독 시작 처리
    public void onSubscribe(Subscription subscription); // 아래 Subscription을 인자로 전달
    
    //데이터 통지시 처리
    public void onNext(T item);
    
    //에러 통지시 처리
    public void onError(Throwable error);

    //완료 통지시 처리
    public void onComplete();
}

 

3. Subscription.java (생산자와 소비자를 연결하는 인터페이스)

public interface Subscription { // 통지받을 데이터 개수를 지정해 데이터 통지를 요청하거나 통지받지 않게 구독을 해지할때 사용하는 인터페이스
    //통지받을 데이터 개수 요청
    public void request(long num);
    
    //구독 해지
    public void cancel(); // Subscriber에서 호출함 (Subscription을 받은 Subscriber에서 구독 해지를 위해 호출)
}

 

4. Processor.java (Publisher/Subscriber의 기능이 모두 있는 인터페이스)

// Publisher와 Subscriber가 사용하는 Subscription은 
public abstract interface Processor<T, R> extends Subscriber<T>, Publichser<R> {}

 

 

 

Reactive Streams의 규칙

1) 구독 시작 통지(onSubscribe)는 해당 구독에서 한번만 발생한다.

 

2) 통지는 순차적으로 이루어진다.
-> 여러 통지를 동시에 할 수 없다. (데이터의 불일치를 방지)

 

3) null을 통지하지 않는다.
-> null을 통지하면 NullPointerException이 발생된다.

 

4) Publisher의 처리는 완료(omComplete) 또는 에러(onError)를 통지하여 종료한다.

 

 

반응형

Designed by JB FACTORY