[Spring Reactive Programming] 3. Reactive Streams - Publisher, Subscriber 별도 스레드로 수행시키기

반응형
728x90
반응형

subscribe()의 과정을 별도 스레드로 수행

1) publisher 생성

Publisher<Integer> pub = sub -> {
    sub.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
            log.debug("request()");
            sub.onNext(1);
            sub.onNext(2);
            sub.onNext(3);
            sub.onNext(4);
            sub.onNext(5);
            sub.onComplete();
        }

        @Override
        public void cancel() {

        }
    });
}; // pub

2) Subscriber 생성

subOnPub.subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
        log.debug("onNext: {}", integer);
    }

    @Override
    public void onError(Throwable t) {
        log.debug("onError: {}", t);
    }

    @Override
    public void onComplete() {
        log.debug("onComplete");
    }
});

 

3) pub-sub 연결할 Publisher 생성

subscribe의 과정을 별도 스레드로 수행하고 싶다.
Publihser 가 느린 경우, 이걸 처리하는 쪽은 빠른 경우, 이렇게하면 main 스레드는 블로킹 없이 바로 수행된다.

Publisher<Integer> subOnPub = new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> sub) {
        ExecutorService es = Executors.newSingleThreadExecutor(); // 코어 스레드 1, Maximum 스레드 1

        es.execute(() -> pub.subscribe(sub));
    }
};
  • 한번에 1개의 스레드만 동작함을 보장한다.
ExecutorService es = Executors.newSingleThreadExecutor();

 

 

전체코드

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class E11_SubscribeSchedulerEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    log.debug("request()");
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }

                @Override
                public void cancel() {

                }
            });
        }; // pub

        // pub-sub 연결
        Publisher<Integer> subOnPub = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                ExecutorService es = Executors.newSingleThreadExecutor(); // 코어 스레드 1, Maximum 스레드 1

//                pub.subscribe(sub); // 구독
                es.execute(() -> pub.subscribe(sub));
            }
        };

        // sub
        subOnPub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.debug("onNext: {}", integer);
            }

            @Override
            public void onError(Throwable t) {
                log.debug("onError: {}", t);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });

        // main 쓰레드는 위 로직이 모두 수행되고 나서야, exit 을 출력하고 종료한다.
        log.info("exit");
    }
}
수행 결과
[main] INFO com.reactive.step03.E11_SubscribeSchedulerEx - exit
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onSubscribe
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - request()
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 1
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 2
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 3
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 4
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 5
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onComplete

 

 

Subscriber 의 수행 메서드들을 별도 스레드로 수행 

1) Publisher 생성

Publisher<Integer> pub = sub -> {
    sub.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
            log.debug("request()");
            sub.onNext(1);
            sub.onNext(2);
            sub.onNext(3);
            sub.onNext(4);
            sub.onNext(5);
            sub.onComplete();
        }

        @Override
        public void cancel() {

        }
    });
}; // pub

 

2) subscribe()

스레드명의 prefix를 직접 설정하고싶다.

Publisher<Integer> subOnPub = sub -> {
    ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
        // thread factory 커스텀 추가
        @Override
        public String getThreadNamePrefix() {
            return "subOn-";
        }
    }); // 코어 스레드 1, Maximum 스레드 1

    es.execute(() -> pub.subscribe(sub));
};

 

3) 별도 스레드로 수행되는 Subscriber 객체 생성하여 subscribe 인자로 넘기기

요청이 동시에 날라와도, 순서대로 수행된다.
하나의 Publihser가 데이터를 던져주는 것은 멀티 스레드로는 불가능하며, 단일 스레드로만 처리된다.

Publisher<Integer> pubOnPub = sub -> {
    subOnPub.subscribe(new Subscriber<Integer>() {
        ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
            // thread factory 커스텀
            @Override
            public String getThreadNamePrefix() {
                return "pubOn-";
            }
        });

        @Override
        public void onSubscribe(Subscription s) {
            sub.onSubscribe(s);
        }

        @Override
        public void onNext(Integer integer) {(integer);
            es.execute(() -> sub.onNext(integer));
        }

        @Override
        public void onError(Throwable t) {
//                    sub.onError(t);
            es.execute(() -> sub.onError(t));
            es.shutdown(); // 스레드풀 종료
        }

        @Override
        public void onComplete() {
//                    sub.onComplete();
            es.execute(() -> sub.onComplete());
            es.shutdown();
        }
    });
};

 

4) Subscriber 생성

pubOnPub.subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
        log.debug("onNext: {}", integer);
    }

    @Override
    public void onError(Throwable t) {
        log.debug("onError: {}", t);
    }

    @Override
    public void onComplete() {
        log.debug("onComplete");
    }
});

 

5) 관계 정리

Publisher exec
Publisher pub
sub.onSubscribe(Subscription)
publisher subOnPub
pub.subscribe(sub)
publisher pubOnPub
subOnPub.subscribe(sub)

 

 

전체코드

package com.reactive.step03;

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class E12_PublisherSchedulerEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    log.debug("request()");
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }

                @Override
                public void cancel() {

                }
            });
        }; // pub

        Publisher<Integer> subOnPub = sub -> {
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                // thread factory 커스텀 추가
                @Override
                public String getThreadNamePrefix() {
                    return "subOn-";
                }
            }); // 코어 스레드 1, Maximum 스레드 1

            es.execute(() -> pub.subscribe(sub));
        };

        // pub-sub 연결
        Publisher<Integer> pubOnPub = sub -> {
            subOnPub.subscribe(new Subscriber<Integer>() {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                    // thread factory 커스텀
                    @Override
                    public String getThreadNamePrefix() {
                        return "pubOn-";
                    }
                });

                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }

                @Override
                public void onNext(Integer integer) {
//                    sub.onNext(integer);
                    es.execute(() -> sub.onNext(integer));
                }

                @Override
                public void onError(Throwable t) {
//                    sub.onError(t);
                    es.execute(() -> sub.onError(t));
                    es.shutdown(); // 스레드풀 종료
                }

                @Override
                public void onComplete() {
//                    sub.onComplete();
                    es.execute(() -> sub.onComplete());
                    es.shutdown();
                }
            });
        };

        // sub
        pubOnPub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.debug("onNext: {}", integer);
            }

            @Override
            public void onError(Throwable t) {
                log.debug("onError: {}", t);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });

        // main 쓰레드는 위 로직이 모두 수행되고 나서야, exit 을 출력하고 종료한다.
        log.info("exit");
    }
}

 

수행결과
[main] INFO com.reactive.step03.E12_PublisherSchedulerEx - exit
[subOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onSubscribe
[subOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - request()
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 1
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 2
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 3
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 4
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 5
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onComplete

 

Flux 예제 맛보기

  • publisheOn 메서드를 사용하여 별도의 스레드로 수행한다.
  • subscribeOn 메서드를 사용하여 별도의 스레드로 수행한다.
public static void main(String[] args) {
    Flux.range(1, 10)
            .publishOn(Schedulers.newSingle("pub")) // pub-1 thread
            .log()
            .subscribeOn(Schedulers.newSingle("sub")) // sub-1 thread
            .subscribe(System.out::println);

    System.out.println("exit"); // main thread
}
결과
exit
[sub-1] INFO reactor.Flux.PublishOn.1 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[sub-1] INFO reactor.Flux.PublishOn.1 - | request(unbounded)
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(1)
1
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(2)
2
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(3)
3
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(4)
4
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(5)
5
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(6)
6
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(7)
7
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(8)
8
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(9)
9
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(10)
10
[pub-2] INFO reactor.Flux.PublishOn.1 - | onComplete()

 

 

Flux + interval() 맛보기 예제

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class E14_FluxScEx_interval {
    public static void main(String[] args) throws InterruptedException {
        Flux.interval(Duration.ofMillis(500))
                .take(10) // 10개만 받고 끝낸다.
                .subscribe(s -> log.debug("onNext:{}", s)); // 별도의 스레드에서 수행된다.

        log.debug("exit1");
        TimeUnit.SECONDS.sleep(5);

        // main 스레드가 종료되도 이 스레드는 작업을 마치기 전까지는 JVM이 내려가지 않는다.
        Executors.newSingleThreadExecutor().execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {}

            System.out.println("Hello");
        });

        System.out.println("exit2");
    }
}

1) interval()

주기를 가지고 계속 수행한다.

Flux.interval(Duration.ofMillis(500))
        .take(10) // 10개만 받고 끝낸다.
        .subscribe(s -> log.debug("onNext:{}", s)); // 별도의 스레드에서 수행된다.

2) 데몬 스레드

main 스레드에서 수행해보면 출력이 안된다. sleep() 추가 필요하다.

  • interval : 데몬스레드를 만들기 때문이다.
    • JVM이 유저 스레드가 하나도 없고 데몬 스레드만 있으면 강제로 종료시킨다.
TimeUnit.SECONDS.sleep(5);

 

3) main 스레드가 종료되도 이 스레드는 작업을 마치기 전까지는 JVM이 내려가지 않는다.

Executors.newSingleThreadExecutor().execute(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {}

    System.out.println("Hello");
});

 

수행결과
[main] DEBUG com.reactive.step03.E14_FluxScEx_interval - exit1
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:0
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:1
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:2
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:3
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:4
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:5
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:6
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:7
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:8
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:9
exit2
Hello

 

 

참고 : 토비의 봄 TV

반응형

Designed by JB FACTORY