[Spring Reactive Programming] 1. Reactive Streams (Publisher, Subscriber, Pub/Sub)

반응형
728x90
반응형

Iterable

리스트를 생성해보자.
// 리스트로 받는 방법
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

 

List 객체는 Collection을 상속한다.
public interface List<E> extends Collection<E> {

 

Collection 객체는 Iterable을 상속한다. 

그러므로 List 타입은 Iterable의 서브타입이라고 할 수 있다.

public interface Collection<E> extends Iterable<E> {

 

Iterable은 아래와 같이 for-each 문에서 데이터를 순회할 수 있다.
for (Integer i : list) { // for-each
    System.out.println(i);
}

 

 

Iterable 객체를 직접 만들어보자.

▶ 1 ~ 9까지의 원소를 가지고, 이 원소를 순회하는 Iterator 오브젝트를 리턴하는 Iterable

-- Lamda
Iterable<Integer> iter2 = () -> new Iterator<>() {
    // Iterator 를 또 구현해야하는 이유는? Iterator 를 통해서 여러번 순회하기 위해서
    int i = 0; // 시작값
    final static int MAX = 10; // 종료값

    @Override
    public boolean hasNext() {
        return i < MAX;
    }

    @Override
    public Integer next() { // 받는쪽에서 호출하며, pull
        return ++i;
    }
};

-- 익명클래스
Iterable<Integer> iter2 = new Iterable<Integer>() {
    @Override
    public Iterator<Integer> iterator() {
        return new Iterator<>() {
            // Iterator 를 또 구현해야하는 이유는? Iterator 를 통해서 여러번 순회하기 위해서
            int i = 0; // 시작값
            final static int MAX = 10; // 종료값

            @Override
            public boolean hasNext() {
                return i < MAX;
            }

            @Override
            public Integer next() { // 받는쪽에서 호출하며, pull
                return ++i;
            }
        };
    }
};

1) Iterable을 구현한 객체로 순회 가능하다.

for (Integer i : iter2) { // for-each 사용 가능 (List, Array가 아님에도 가능)
    System.out.println(i);
}

 

2) iter2.iterator()로 순회 가능하다.

for (Iterator<Integer> it = iter2.iterator(); it.hasNext();) {
    System.out.println(it.next());
}

 

▶ Iterable <- duality (상대성) -> Observable

Iterable Observable
Pull : 데이터를 가져오는것 Push :  데이터를 주는것 (통지하는것)

 

 

Observable

Observer 추가
// Observer 선언
Observer ob = new Observer() { // Subscriber
    @Override
    public void update(Observable o, Object arg) {
        System.out.println(Thread.currentThread().getName() + ":" + arg);
    }
};
        
// Observer 추가
IntObservable intObservable = new IntObservable();
intObservable.addObserver(ob);

// main 스레드가 아닌 별도의 스레드로 동작하도록
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(intObservable); // Runnable 구현 객체 
// intObservable.run();

// main 쓰레드
System.out.println(Thread.currentThread().getName() + " EXIT");

es.shutdown();

 

IntObservable.java (Publisher)
static class IntObservable extends Observable implements Runnable { // Publisher
    @Override
    public void run() {
        for (int i = 1; i <= 10; i++) {
            setChanged();
            notifyObservers(i); // 값을 던진다.
        }
    }
}

 

  • Observable.setChange()
protected synchronized void setChanged() {
    changed = true;
}

 

  • Observable.notifyObservers()
public void notifyObservers(Object arg) {

    Object[] arrLocal;

    synchronized (this) {
        if (!changed)
            return;
        arrLocal = obs.toArray();
        clearChanged();
    }

    for (int i = arrLocal.length-1; i>=0; i--)
        ((Observer)arrLocal[i]).update(this, arg);
}

 

출력결과
main EXIT
pool-1-thread-1:1
pool-1-thread-1:2
pool-1-thread-1:3
pool-1-thread-1:4
pool-1-thread-1:5
pool-1-thread-1:6
pool-1-thread-1:7
pool-1-thread-1:8
pool-1-thread-1:9
pool-1-thread-1:10

 

 

Pub/Sub

Publisher Subscriber
연속된 요소들을 제공 정보를 요청하고, Publisher로부터 데이터를 받는다.
Publisher.subscribe(Subscriber) 로 구독  signal : Subscriber의 onSubscribe(), onNext(), onError(), onComplete()

 

1) Subscriber 생성

// Subscriber 를 만들어보자.
Flow.Subscriber<Integer> s  = new Flow.Subscriber<Integer>() {
    Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) { // 필수
        System.out.println("E03_PubSub.onSubscribe");

        // subscription 저장
        this.subscription = subscription;

        // subscription의 reuqest()
//                subscription.request(Long.MAX_VALUE); // 전부 다 받기
        this.subscription.request(1); // 1개 받기
    }

    int bufferSize = 2;

    /**
     * publisher 에서 통지한 데이터를 처리
     * @param item the item
     */
    @Override
    public void onNext(Integer item) {
        System.out.println("E03_PubSub.onNext : " + item);

        // 다음 데이터를 다시 요청
        if (--bufferSize <= 0) {
            bufferSize = 2;
            this.subscription.request(2); // 2개 받기
        }
    }

    /**
     * 오류 처리 (Publisher 에서 어떤 종류의 에러가 발생하더라도 이 메서드에서 처리한다.)
     * @param throwable the exception
     */
    @Override
    public void onError(Throwable throwable) {
        System.out.println("E03_PubSub.onError");
    }

    @Override
    public void onComplete() {
        System.out.println("E03_PubSub.onComplete");
    }
};

 

2) [1, 2, 3, 4, 5]를 가진 리스트를 던지는 Publisher 생성

// Publisher 를 만들어보자.
List<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);

// 위 데이터를 던지는 Publisher
Flow.Publisher p = new Flow.Publisher() {
    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        subscriber.onSubscribe(new Flow.Subscription() { // Subscriber.onSubscribe() 호출
            Iterator<Integer> it = itr.iterator();

            @Override
            public void request(long n) {
                try {
                    while (n-- > 0) {
                        if (it.hasNext()) {
                            subscriber.onNext(it.next()); // 데이터 통지
                        } else {
                            subscriber.onComplete(); // 통지 완료
                            break;
                        }
                    }
                } catch (RuntimeException e) {
                    subscriber.onError(e);
                }
            }

            @Override
            public void cancel() {

            }
        });
    }
};

 

3) Publisher 객체의 subscribe(Subscriber 객체) 호출하여 구독

// 구독
p.subscribe(s);

 

 

호출 흐름

Subscriber -> Subscription(중개역할) -> Publisher -> 이벤트 발생 -> Subscriber

1) Publisher의 subscribe()에서 매개변수로 받은 Subscriber 객체의 onSubscribe(Subscription) 호출

  • new Flow.Subscription()로 Subscription 객체를 생성하고, request(), cancel() 메서드를 오버라이드 선언한다.
@Override
public void subscribe(Flow.Subscriber subscriber) {
    subscriber.onSubscribe(new Flow.Subscription() { // Subscriber.onSubscribe() 호출
        Iterator<Integer> it = itr.iterator();

        @Override
        public void request(long n) {
            try {
                // Subscriber.onSubscribe() 안에서 호출
                while (n-- > 0) {
                    if (it.hasNext()) {
                        subscriber.onNext(it.next()); // 데이터 통지
                    } else {
                        subscriber.onComplete(); // 통지 완료
                        break;
                    }
                }
            } catch (RuntimeException e) {
                // 에러 발생시 Subscriber 에서 처리
                subscriber.onError(e);
            }
        }

        @Override
        public void cancel() {

        }
    });
}

 

2) 호출된 Subscriber 객체의 onSubscribe() 에서 매개변수로 받은 Flow.Subscription 객체의 request() 호출한다.

  • 받을 데이터 개수를 설정할 수 있다.
  • subscription.request(1)는 '데이터 1개를 받는다'라는 의미다.
@Override
public void onSubscribe(Flow.Subscription subscription) { // 필수
    System.out.println("E03_PubSub.onSubscribe");

    // subscription 저장
    this.subscription = subscription;

    // subscription의 reuqest()
//                subscription.request(Long.MAX_VALUE); // 전부 다 받기
    this.subscription.request(1); // 1개 받기
}

 

3) Scription 객체의 request() 메서드에서 onNext()를 통해 데이터를 통지하거나, onComplete()를 통해 통지를 완료한다.

@Override
public void request(long n) {
    try {
        // Subscriber.onSubscribe() 안에서 호출
        while (n-- > 0) {
            if (it.hasNext()) {
                subscriber.onNext(it.next()); // 데이터 통지
            } else {
                subscriber.onComplete(); // 통지 완료
                break;
            }
        }
    } catch (RuntimeException e) {
        // 에러 발생시 Subscriber 에서 처리
        subscriber.onError(e);
    }
}

 

4) onNext()가 호출되면 Publisher에서 통지된 데이터가 처리된다.

@Override
public void onNext(Integer item) {
    System.out.println("E03_PubSub.onNext : " + item);

    // 다음 데이터를 다시 요청
    if (--bufferSize <= 0) {
        bufferSize = 2;
        this.subscription.request(2); // 2개 받기
    }
}

 

5) Publisher에서 어떤 종류의 에러가 발생한다면 onError()가 호출되고, 통지가 끝났다면 onComplete()가 호출된다.

  • 그러므로, 둘 중 하나만 호출되겠다.
@Override
public void onError(Throwable throwable) {
    System.out.println("E03_PubSub.onError");
}

@Override
public void onComplete() {
    System.out.println("E03_PubSub.onComplete");
}

 

 

위 코드를 비동기로 변경해보자.

별도 스레드로 수행하기 위해 ExecutorService 객체를 선언하자.

// 별도 스레드
ExecutorService es = Executors.newSingleThreadExecutor();

https://devfunny.tistory.com/807?category=957918 

 

[JAVA8 병렬프로그래밍] Executors 클래스, ExecutorService 인터페이스

Executors 클래스 - Executor 인터페이스 : 컨커런트 API의 핵심 인터페이스다. 이 인터페이스를 구현한 여러 종류의 클래스를 기본으로 제공한다. - 스레드 풀 : 스레드를 관리하기 위한 풀이다. 병렬

devfunny.tistory.com

 

Publisher는 쓰레드는 여러개 만들어서 데이터를 통지하는 것이 스펙상으로 불가능하다.
하나의 Subscriber는 순서대로 데이터가 통지됨으로 알고있다.
어느 한순간에는 적어도 한 스레드에서만 데이터가 날라올 수 있다고 본다. 동시에 다른 스레드에서 멀티 스레드 방식으로 동작하는건 가능하다.

 

1) Publisher 생성

Future 객체로 리턴받는다. 자바 비동기 작업의 결과 정보를 제공해주는 클래스다.

Flow.Publisher p = new Flow.Publisher() {
    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        subscriber.onSubscribe(new Flow.Subscription() { // Subscriber.onSubscribe() 호출
            Iterator<Integer> it = itr.iterator();

            @Override
            public void request(long n) {
                Future<?> submit = es.submit(() -> {
                    int i = 0;

                    try {
                        // Subscriber.onSubscribe() 안에서 호출
                        while (i++ < n) {
                            if (it.hasNext()) {
                                subscriber.onNext(it.next()); // 데이터 통지
                            } else {
                                subscriber.onComplete(); // 통지 완료
                                break;
                            }
                        }
                    } catch (RuntimeException e) {
                        subscriber.onError(e);
                    }
                });
            }

            @Override
            public void cancel() { // 작업을 취소시킬 수 있다. 이때 Future를 통해서 interrupt를 날릴 수 있다.

            }
        });
    }
};

 

2) Subscriber 생성

  • onSubscribe() 메서드는 subscribe()를 호출한 동일한 스레드 내에서 처리되어야한다.
    • 즉, main 스레드에서 수행되어야한다.
Flow.Subscriber<Integer> s  = new Flow.Subscriber<Integer>() {
    Flow.Subscription subscription;

    /*
        subscribe()한 동일한 쓰레드 내에서 처리되야한다.
        -> main 스레드
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) { // 필수
        System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");

        // subscription 저장
        this.subscription = subscription;

        // subscription의 reuqest()
//                subscription.request(Long.MAX_VALUE); // 전부 다 받기

        // 이것도 동일한 쓰레드 내에서 처리되야한다. (새로운 스레드 만들어서 request 날리면 안된다.)
        this.subscription.request(1); // 1개 받기
    }

    int bufferSize = 2;

    /**
     * publisher 에서 통지한 데이터를 처리
     * @param item the item
     */
    @Override
    public void onNext(Integer item) {
        System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onNext : " + item);

        // 다음 데이터를 다시 요청
        if (--bufferSize <= 0) {
            bufferSize = 2;
            this.subscription.request(2); // 2개 받기
        }
    }

    /**
     * 오류 처리 (Publisher 에서 어떤 종류의 에러가 발생하더라도 이 메서드에서 처리한다.)
     * @param throwable the exception
     */
    @Override
    public void onError(Throwable throwable) {
        System.out.println("E03_PubSub.onError : " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("E03_PubSub.onComplete");
    }
};

 

3) p.subscribe(s) 를 호출하여 구독 시작 

public static void main(String[] args) throws InterruptedException {
    ...
    // 구독
    p.subscribe(s);

    // time 걸어서 동작중에 shutdown 되지 않도록 한다.
    es.awaitTermination(10, TimeUnit.HOURS);
    es.shutdown();
}

 

결과
main : E03_PubSub.onSubscribe
pool-1-thread-1 : E03_PubSub.onNext : 1

 

동기 vs 비동기 결과 비교

Subscriber의 onSubscribe() 메서드에 각각 아래 한줄을 추가하여 결과를 비교해보자.

System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");
@Override
public void onSubscribe(Flow.Subscription subscription) { // 필수
    System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");

    // subscription 저장
    this.subscription = subscription;

    // subscription의 reuqest()
//                subscription.request(Long.MAX_VALUE); // 전부 다 받기

    // 이것도 동일한 쓰레드 내에서 처리되야한다. (새로운 스레드 만들어서 request 날리면 안된다.)
    this.subscription.request(1); // 1개 받기

    System.out.println(Thread.currentThread().getName() + " : E03_PubSub.onSubscribe");
}
동기 비동기
E03_PubSub.onSubscribe
E03_PubSub.onNext : 1
main : E03_PubSub.onSubscribe
main : E03_PubSub.onSubscribe
main : E03_PubSub.onSubscribe
pool-1-thread-1 : E03_PubSub.onNext : 1

 

 

참고 : 토비의 봄 TV

 

 

흐름 정리

https://pearlluck.tistory.com/730

반응형

Designed by JB FACTORY