[리액티브 프로그래밍] Reactor의 debugging
- Coding/Reactive
- 2023. 4. 30.
Reactor의 디버깅
Reactor는 처리되는 작업들이 대부분 비동기적으로 실행되고, Reactor Sequence는 선언형 프로그래밍 방식으로 구성되므로 디버깅이 쉽지않다. Reactor에서는 디버깅을 할 수 있는 방법을 몇가지 제공한다.
Debug Mode
Debug Mode를 활성화해서 Reactor Sequence를 디버깅할 수 있다.
@Slf4j
public class Example12_1 {
public static Map<String, String> fruits = new HashMap<>();
static {
fruits.put("banana", "바나나");
fruits.put("apple", "사과");
fruits.put("pear", "배");
fruits.put("grape", "포도");
}
public static void main(String[] args) throws InterruptedException {
Hooks.onOperatorDebug();
Flux
.fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.map(String::toLowerCase)
.map(fruit -> fruit.substring(0, fruit.length() - 1))
.map(fruits::get)
.map(translated -> "맛있는 " + translated)
.subscribe(
log::info,
error -> log.error("# onError:", error));
Thread.sleep(100L);
}
}
1) 디버그 모드 활성화
Hooks.onOperatorDebug();
2) 비교를 위해 위 코드를 주석해보고 실행해보자.
// Hooks.onOperatorDebug();
실행결과 (디버그 모드 비활성화)
13:49:33.878 [parallel-1] INFO - 맛있는 바나나
13:49:33.878 [parallel-1] INFO - 맛있는 사과
13:49:33.878 [parallel-1] INFO - 맛있는 배
13:49:33.894 [parallel-1] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$58/0x0000000800162440] returned a null value.
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
실행결과 (디버그 모드 활성화)
13:42:15.817 [main] DEBUG- Using Slf4j logging framework
13:42:15.819 [main] DEBUG- Enabling stacktrace debugging via onOperatorDebug
13:42:15.904 [parallel-1] INFO - 맛있는 바나나
13:42:15.906 [parallel-1] INFO - 맛있는 사과
13:42:15.906 [parallel-1] INFO - 맛있는 배
13:42:15.914 [parallel-1] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$66/0x0000000800164440] returned a null value.
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
reactor.core.publisher.Flux.map(Flux.java:6271)
chapter12.Example12_1.main(Example12_1.java:35)
Error has been observed at the following site(s):
*__Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:35)
|_ Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:36)
Original Stack Trace:
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Operator 체인상에서 에러가 발생한 지점을 정확히 가리키고 있고, 에러가 시작된 지점부터 에러 전파 상태를 표시해주고있다.
Debug Mode의 한계
Hooks.onOperatorDebug();
디버그 모드의 활성화는 애플리케이션 내에서 비용이 많이 드는 동작 과정을 거친다.
- 애플리케이션 내에 있는 모든 Operator의 스택트레이스(Stacktrace)를 캡처한다.
- 에러가 발생하면 캡처한 정보를 기반으로 에러가 발생한 Assembly(리턴하는 새로운 Mono 또는 Flux가 선언된 지점)의 스택트레이스를 원본 스택트레이스 중간에 끼워넣는다.
따라서 에러 원인을 추적하기 위해 처음부터 디버그 모드를 활성화하는 것은 권장하지 않는다.
checkpoint() Operator
특정 Operator 체인 내의 스택트레이스만 캡처하는 방법이다.
traceback
디버그 모드를 활성화하면 Operator의 Assembly 정보를 캡처하는데 이중에서 에러가 발생한 Operator의 스택트레이스를 캡처한 Assembly 정보를 말한다.
[1번째 방법] Traceback을 출력하는 방법
checkpoint()를 사용하면 실제 에러가 발생한 assembly 지점 또는 에러가 전파된 assembly 지점의 traceback이 추가된다.
@Slf4j
public class Example12_2 {
public static void main(String[] args) {
Flux
.just(2, 4, 6, 8)
.zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
.map(num -> num + 2)
.checkpoint()
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error)
);
}
}
1) map() Operator 바로 다음 위치에 checkpoint()가 존재한다.
.map(num -> num + 2)
.checkpoint()
실행결과
14:01:46.112 [main] INFO - # onNext: 4
14:01:46.112 [main] INFO - # onNext: 4
14:01:46.112 [main] INFO - # onNext: 4
14:01:46.121 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
at chapter12.Example12_2.lambda$main$0(Example12_2.java:15)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxMap] :
reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
chapter12.Example12_2.main(Example12_2.java:17)
Error has been observed at the following site(s):
*__checkpoint() ⇢ at chapter12.Example12_2.main(Example12_2.java:17)
Original Stack Trace:
at chapter12.Example12_2.lambda$main$0(Example12_2.java:15)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_2.main(Example12_2.java:18)
ArithmeticException이 발생한것을 알 수 있다. 이 에러가 map() Operator 다음에 추가한 checkpoint() 지점까지는 에러가 전파되었다는 것을 예상할 수 있다.
checkpoint()를 하나 더 추가하자.
@Slf4j
public class Example12_3 {
public static void main(String[] args) {
Flux
.just(2, 4, 6, 8)
.zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
.checkpoint()
.map(num -> num + 2)
.checkpoint()
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error)
);
}
}
1) 에러가 예상되는 지점을 정확하게 찾기 위해 map() Operator 위에 checkpoint()를 추가했다.
.checkpoint()
.map(num -> num + 2)
.checkpoint()
실행결과
14:22:52.935 [main] INFO - # onNext: 4
14:22:52.936 [main] INFO - # onNext: 4
14:22:52.936 [main] INFO - # onNext: 4
14:22:52.944 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
at chapter12.Example12_3.lambda$main$0(Example12_3.java:15)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxZip] :
reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
chapter12.Example12_3.main(Example12_3.java:16)
Error has been observed at the following site(s):
*__checkpoint() ⇢ at chapter12.Example12_3.main(Example12_3.java:16)
|_ checkpoint() ⇢ at chapter12.Example12_3.main(Example12_3.java:18)
Original Stack Trace:
at chapter12.Example12_3.lambda$main$0(Example12_3.java:15)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_3.main(Example12_3.java:19)
첫번째 checkpoint()는 zipWith() Operator에서 직접적으로 에러가 발생했음을 예상할 수 있다. 그러므로 이제는 zipWith() 내부의 처리 로직에서 에러가 발생한 원인을 찾으면된다.
[2번째 방법] Trackback 출력 없이 식별자를 포함한 Description을 출력해서 에러 발생 지점을 예상하는 방법
@Slf4j
public class Example12_4 {
public static void main(String[] args) {
Flux
.just(2, 4, 6, 8)
.zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
.checkpoint("Example12_4.zipWith.checkpoint")
.map(num -> num + 2)
.checkpoint("Example12_4.map.checkpoint")
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error)
);
}
}
1) checkpoint(description)을 사용하여 description을 통해 에러 발생 지점을 예상할 수 있다.
.checkpoint("Example12_4.map.checkpoint")
실행결과
14:29:39.144 [main] INFO - # onNext: 4
14:29:39.145 [main] INFO - # onNext: 4
14:29:39.145 [main] INFO - # onNext: 4
14:29:39.151 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
at chapter12.Example12_4.lambda$main$0(Example12_4.java:16)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Example12_4.zipWith.checkpoint
|_ checkpoint ⇢ Example12_4.map.checkpoint
Original Stack Trace:
at chapter12.Example12_4.lambda$main$0(Example12_4.java:16)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_4.main(Example12_4.java:20)
[3번째 방법] Trackback과 Description을 모두 출력하는 방법
@Slf4j
public class Example12_5 {
public static void main(String[] args) {
Flux
.just(2, 4, 6, 8)
.zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
.checkpoint("Example12_4.zipWith.checkpoint", true)
.map(num -> num + 2)
.checkpoint("Example12_4.map.checkpoint", true)
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error)
);
}
}
1) checkpoint(description, forceStackTrace)를 사용하면 description과 Traceback을 모두 출력할 수 있다.
두번째 파라미터 값을 true로 설정하면 된다.
.checkpoint("Example12_4.zipWith.checkpoint", true)
실행결과
14:31:49.277 [main] INFO - # onNext: 4
14:31:49.277 [main] INFO - # onNext: 4
14:31:49.278 [main] INFO - # onNext: 4
14:31:49.284 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
at chapter12.Example12_5.lambda$main$0(Example12_5.java:16)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxZip], described as [Example12_4.zipWith.checkpoint] :
reactor.core.publisher.Flux.checkpoint(Flux.java:3417)
chapter12.Example12_5.main(Example12_5.java:17)
Error has been observed at the following site(s):
*__checkpoint(Example12_4.zipWith.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:17)
|_ checkpoint(Example12_4.map.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:19)
Original Stack Trace:
at chapter12.Example12_5.lambda$main$0(Example12_5.java:16)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_5.main(Example12_5.java:20)
조금더 복잡한 예제로 봐보자.
@Slf4j
public class Example12_6 {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(2, 4, 6, 8);
Flux<Integer> other = Flux.just(1, 2, 3, 0);
Flux<Integer> multiplySource = divide(source, other).checkpoint();
Flux<Integer> plusSource = plus(multiplySource).checkpoint();
plusSource.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error)
);
}
private static Flux<Integer> divide(Flux<Integer> source, Flux<Integer> other) {
return source.zipWith(other, (x, y) -> x/y);
}
private static Flux<Integer> plus(Flux<Integer> source) {
return source.map(num -> num + 2);
}
}
1) 곱셈과 덧셈이라는 두가지 기능으로 나누어져있다.
Flux<Integer> multiplySource = divide(source, other).checkpoint();
Flux<Integer> plusSource = plus(multiplySource).checkpoint();
실행결과
14:37:03.311 [main] INFO - # onNext: 4
14:37:03.311 [main] INFO - # onNext: 4
14:37:03.311 [main] INFO - # onNext: 4
14:37:03.318 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
at chapter12.Example12_6.lambda$divide$2(Example12_6.java:26)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxZip] :
reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
chapter12.Example12_6.main(Example12_6.java:15)
Error has been observed at the following site(s):
*__checkpoint() ⇢ at chapter12.Example12_6.main(Example12_6.java:15)
|_ checkpoint() ⇢ at chapter12.Example12_6.main(Example12_6.java:16)
Original Stack Trace:
at chapter12.Example12_6.lambda$divide$2(Example12_6.java:26)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_6.main(Example12_6.java:19)
어디서 에러가 발생했는지 발견하기 쉽지 않을때, checkpoint()를 각 Operator 체인에 추가하여 범위를 좁혀나가면서 에러 발생 지점을 찾을 수 있다.
log() Operator
Reactor Sequence 동작을 로그로 출력하여, 디버깅이 가능하다.
@Slf4j
public class Example12_7 {
public static Map<String, String> fruits = new HashMap<>();
static {
fruits.put("banana", "바나나");
fruits.put("apple", "사과");
fruits.put("pear", "배");
fruits.put("grape", "포도");
}
public static void main(String[] args) {
Flux.fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
.map(String::toLowerCase)
.map(fruit -> fruit.substring(0, fruit.length() - 1))
.log()
// .log("Fruit.Substring", Level.FINE)
.map(fruits::get)
.subscribe(
log::info,
error -> log.error("# onError:", error));
}
}
1) map() Operator 다음으로 log()을 추가했다.
.map(fruit -> fruit.substring(0, fruit.length() - 1))
.log()
실행결과
14:44:06.010 [main] INFO - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
14:44:06.011 [main] INFO - | request(unbounded)
14:44:06.011 [main] INFO - | onNext(banana)
14:44:06.011 [main] INFO - 바나나
14:44:06.011 [main] INFO - | onNext(apple)
14:44:06.011 [main] INFO - 사과
14:44:06.012 [main] INFO - | onNext(pear)
14:44:06.012 [main] INFO - 배
14:44:06.012 [main] INFO - | onNext(melon)
14:44:06.014 [main] INFO - | cancel()
14:44:06.015 [main] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_7$$Lambda$38/0x0000000800124840] returned a null value.
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_7.main(Example12_7.java:30)
오류 위치 분석
14:44:06.012 [main] INFO - | onNext(melon)
14:44:06.014 [main] INFO - | cancel()
실행결과의 마지막에 cancel() Signal이 출력되었다. 이는 두번째 map() Operator가 "melon"이라는 문자열을 emit했지만 두번째 map() Operator 이후의 어떤 지점에서 "melon" 문자열을 처리하는 중에 에러가 발생했음을 의미한다.
이후 로직에는 map() Operator가 있으므로, 이 지점에서 어떤 문제가 발생했는지를 예측할 수 있다.
로그의 로그 레벨을 바꿔보자.
.log("Fruit.Substring", Level.FINE)
실행결과
14:58:34.059 [main] DEBUG- | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
14:58:34.060 [main] DEBUG- | request(unbounded)
14:58:34.060 [main] DEBUG- | onNext(banana)
14:58:34.060 [main] INFO - 바나나
14:58:34.061 [main] DEBUG- | onNext(apple)
14:58:34.061 [main] INFO - 사과
14:58:34.061 [main] DEBUG- | onNext(pear)
14:58:34.061 [main] INFO - 배
14:58:34.061 [main] DEBUG- | onNext(melon)
14:58:34.063 [main] DEBUG- | cancel()
14:58:34.065 [main] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_7$$Lambda$38/0x0000000800124840] returned a null value.
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_7.main(Example12_7.java:31)
로그 레벨이 모두 DEBUG로 바뀌었고, 이 DEBUG 로그들이 두번째 map() Operator에서 발생한 Signal이라는 것을 쉽게 구분할 수 있도록 'Fruit.Substring'이라는 카테고리까지 표시해주고 있다.
Level.FINE
Java에서 지원하는 로그 레벨이며, Slf4j 로깅 프레임워크에서 사용하는 로그 레벨 중 DEBUG 레벨에 해당된다.
.log("Fruit.Substring", Level.FINE)
'Coding > Reactive' 카테고리의 다른 글
[리액티브 프로그래밍] 리액티브 스트림즈(Reactive Streams) 핵심 컴포넌트의 동작과정 (0) | 2023.06.11 |
---|---|
[리액티브 프로그래밍] Spring WebFlux의 요청 처리 흐름 (0) | 2023.05.02 |
[리액티브 프로그래밍] Reactor의 Context (0) | 2023.04.29 |
[리액티브 프로그래밍] Reactor의 Scheduler (0) | 2023.04.28 |
[리액티브 프로그래밍] Backpressure의 개념과 Backpressure 전략 (1) | 2023.04.24 |