반응형
728x90
반응형
subscribe()의 과정을 별도 스레드로 수행
1) publisher 생성
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.debug("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
}; // pub
2) Subscriber 생성
subOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext: {}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError: {}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
3) pub-sub 연결할 Publisher 생성
subscribe의 과정을 별도 스레드로 수행하고 싶다.
Publihser 가 느린 경우, 이걸 처리하는 쪽은 빠른 경우, 이렇게하면 main 스레드는 블로킹 없이 바로 수행된다.
Publisher<Integer> subOnPub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
ExecutorService es = Executors.newSingleThreadExecutor(); // 코어 스레드 1, Maximum 스레드 1
es.execute(() -> pub.subscribe(sub));
}
};
- 한번에 1개의 스레드만 동작함을 보장한다.
ExecutorService es = Executors.newSingleThreadExecutor();
전체코드
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class E11_SubscribeSchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.debug("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
}; // pub
// pub-sub 연결
Publisher<Integer> subOnPub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
ExecutorService es = Executors.newSingleThreadExecutor(); // 코어 스레드 1, Maximum 스레드 1
// pub.subscribe(sub); // 구독
es.execute(() -> pub.subscribe(sub));
}
};
// sub
subOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext: {}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError: {}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
// main 쓰레드는 위 로직이 모두 수행되고 나서야, exit 을 출력하고 종료한다.
log.info("exit");
}
}
수행 결과
[main] INFO com.reactive.step03.E11_SubscribeSchedulerEx - exit
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onSubscribe
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - request()
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 1
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 2
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 3
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 4
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onNext: 5
[pool-1-thread-1] DEBUG com.reactive.step03.E11_SubscribeSchedulerEx - onComplete
Subscriber 의 수행 메서드들을 별도 스레드로 수행
1) Publisher 생성
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.debug("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
}; // pub
2) subscribe()
스레드명의 prefix를 직접 설정하고싶다.
Publisher<Integer> subOnPub = sub -> {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
// thread factory 커스텀 추가
@Override
public String getThreadNamePrefix() {
return "subOn-";
}
}); // 코어 스레드 1, Maximum 스레드 1
es.execute(() -> pub.subscribe(sub));
};
3) 별도 스레드로 수행되는 Subscriber 객체 생성하여 subscribe 인자로 넘기기
요청이 동시에 날라와도, 순서대로 수행된다.
하나의 Publihser가 데이터를 던져주는 것은 멀티 스레드로는 불가능하며, 단일 스레드로만 처리된다.
Publisher<Integer> pubOnPub = sub -> {
subOnPub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
// thread factory 커스텀
@Override
public String getThreadNamePrefix() {
return "pubOn-";
}
});
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {(integer);
es.execute(() -> sub.onNext(integer));
}
@Override
public void onError(Throwable t) {
// sub.onError(t);
es.execute(() -> sub.onError(t));
es.shutdown(); // 스레드풀 종료
}
@Override
public void onComplete() {
// sub.onComplete();
es.execute(() -> sub.onComplete());
es.shutdown();
}
});
};
4) Subscriber 생성
pubOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext: {}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError: {}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
5) 관계 정리
Publisher | exec |
Publisher | pub sub.onSubscribe(Subscription) |
publisher | subOnPub pub.subscribe(sub) |
publisher | pubOnPub subOnPub.subscribe(sub) |
전체코드
package com.reactive.step03;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class E12_PublisherSchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.debug("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
}; // pub
Publisher<Integer> subOnPub = sub -> {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
// thread factory 커스텀 추가
@Override
public String getThreadNamePrefix() {
return "subOn-";
}
}); // 코어 스레드 1, Maximum 스레드 1
es.execute(() -> pub.subscribe(sub));
};
// pub-sub 연결
Publisher<Integer> pubOnPub = sub -> {
subOnPub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
// thread factory 커스텀
@Override
public String getThreadNamePrefix() {
return "pubOn-";
}
});
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
// sub.onNext(integer);
es.execute(() -> sub.onNext(integer));
}
@Override
public void onError(Throwable t) {
// sub.onError(t);
es.execute(() -> sub.onError(t));
es.shutdown(); // 스레드풀 종료
}
@Override
public void onComplete() {
// sub.onComplete();
es.execute(() -> sub.onComplete());
es.shutdown();
}
});
};
// sub
pubOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext: {}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError: {}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
// main 쓰레드는 위 로직이 모두 수행되고 나서야, exit 을 출력하고 종료한다.
log.info("exit");
}
}
수행결과
[main] INFO com.reactive.step03.E12_PublisherSchedulerEx - exit
[subOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onSubscribe
[subOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - request()
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 1
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 2
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 3
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 4
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onNext: 5
[pubOn-1] DEBUG com.reactive.step03.E12_PublisherSchedulerEx - onComplete
Flux 예제 맛보기
- publisheOn 메서드를 사용하여 별도의 스레드로 수행한다.
- subscribeOn 메서드를 사용하여 별도의 스레드로 수행한다.
public static void main(String[] args) {
Flux.range(1, 10)
.publishOn(Schedulers.newSingle("pub")) // pub-1 thread
.log()
.subscribeOn(Schedulers.newSingle("sub")) // sub-1 thread
.subscribe(System.out::println);
System.out.println("exit"); // main thread
}
결과
exit
[sub-1] INFO reactor.Flux.PublishOn.1 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[sub-1] INFO reactor.Flux.PublishOn.1 - | request(unbounded)
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(1)
1
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(2)
2
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(3)
3
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(4)
4
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(5)
5
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(6)
6
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(7)
7
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(8)
8
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(9)
9
[pub-2] INFO reactor.Flux.PublishOn.1 - | onNext(10)
10
[pub-2] INFO reactor.Flux.PublishOn.1 - | onComplete()
Flux + interval() 맛보기 예제
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class E14_FluxScEx_interval {
public static void main(String[] args) throws InterruptedException {
Flux.interval(Duration.ofMillis(500))
.take(10) // 10개만 받고 끝낸다.
.subscribe(s -> log.debug("onNext:{}", s)); // 별도의 스레드에서 수행된다.
log.debug("exit1");
TimeUnit.SECONDS.sleep(5);
// main 스레드가 종료되도 이 스레드는 작업을 마치기 전까지는 JVM이 내려가지 않는다.
Executors.newSingleThreadExecutor().execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {}
System.out.println("Hello");
});
System.out.println("exit2");
}
}
1) interval()
주기를 가지고 계속 수행한다.
Flux.interval(Duration.ofMillis(500))
.take(10) // 10개만 받고 끝낸다.
.subscribe(s -> log.debug("onNext:{}", s)); // 별도의 스레드에서 수행된다.
2) 데몬 스레드
main 스레드에서 수행해보면 출력이 안된다. sleep() 추가 필요하다.
- interval : 데몬스레드를 만들기 때문이다.
- JVM이 유저 스레드가 하나도 없고 데몬 스레드만 있으면 강제로 종료시킨다.
TimeUnit.SECONDS.sleep(5);
3) main 스레드가 종료되도 이 스레드는 작업을 마치기 전까지는 JVM이 내려가지 않는다.
Executors.newSingleThreadExecutor().execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {}
System.out.println("Hello");
});
수행결과
[main] DEBUG com.reactive.step03.E14_FluxScEx_interval - exit1
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:0
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:1
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:2
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:3
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:4
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:5
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:6
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:7
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:8
[parallel-1] DEBUG com.reactive.step03.E14_FluxScEx_interval - onNext:9
exit2
Hello
참고 : 토비의 봄 TV
반응형