[리액티브 프로그래밍] Backpressure의 개념과 Backpressure 전략

반응형
728x90
반응형

Backpressure

리액티브 프로그래밍에서의 배압, 즉 Backpressure은 Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 것이다. 

https://velog.io/@zini9188/Spring-WebFlux-Project-Reactor

Publisher가 빠르게 데이터를 emit하는 경우 Subscriber의 처리속도가 느려서 처리가 끝나기도 전에 계속해서 emit하게된다. 이렇게되면 처리되지 않고 대기 중인 데이터가 지속적으로 쌓이게되어 오버플로가 발생하거나 최악의 경우에는 시스템이 다운되는 문제가 발생한다. 이 문제를 해결하기 위한 수단이 바로 Backpressure이다.

 

 

Reactor에서의 Backpressure 처리 방식

첫번째 방법. Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청

숫자 1부터 1씩 증가한 5개의 데이터를 emit하도록 정의되었으며, BaseSubscriber가 데이터를 1개씩 보내주기를 Publisher에게 요청한다.

@Slf4j
public class Example8_1 {
    public static void main(String[] args) {
        Flux.range(1, 5)
            .doOnRequest(data -> log.info("# doOnRequest: {}", data))
            .subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    request(1);
                }

                @SneakyThrows
                @Override
                protected void hookOnNext(Integer value) {
                    Thread.sleep(2000L);
                    log.info("# hookOnNext: {}", value);
                    request(1);
                }
            });
    }
}

1) Reactor에서 Subscriber가 데이터 요청 개수를 직접 제어하기 위한 코드

.subscribe(new BaseSubscriber<Integer>() {

 

2) hookOnSubscribe()

Subscriber 인터페이스에 정의된 onSubscribe() 메서드를 대신해 구독 시점에 request() 메서드를 호출해서 최초 데이터 요청 개수를 제어하는 역할을 한다.

@Override
protected void hookOnSubscribe(Subscription subscription) {
    request(1);
}

 

 

3) hookOnNext()

Subscriber 인터페이스에 정의된 onNext() 메서드를 대신해 Publisher가 emit한 데이터를 전달받아 처리한 후에 Publisher에게 또다시 데이터를 요청하는 역할을 한다. reuqest() 메서드를 호출해서 데이터 요청 개수를 제어한다.

@Override
protected void hookOnNext(Integer value) {
    Thread.sleep(2000L);
    log.info("# hookOnNext: {}", value);
    request(1);
}

 

실행결과
[main] INFO - # doOnRequest: 1
[main] INFO - # hookOnNext: 1
[main] INFO - # doOnRequest: 1
[main] INFO - # hookOnNext: 2
[main] INFO - # doOnRequest: 1
[main] INFO - # hookOnNext: 3
[main] INFO - # doOnRequest: 1
[main] INFO - # hookOnNext: 4
[main] INFO - # doOnRequest: 1
[main] INFO - # hookOnNext: 5
[main] INFO - # doOnRequest: 1

"# doOnRequest: 1"은 hookOnSubscribe() 메서드에서 request() 메서드를 호출함으로써 출력된 결과이고, 나머지 "# doOnRequest: 1"은 모두 hookOnNext() 메서드에서 request() 메서드를 호출함으로써 출력된 결과이다.

 

 

두번째 방법. Backpressure 전략 사용

Reactor에서는 Backpressure를 위한 다양한 전략을 제공한다.

종류 설명
IGNORE 전략 Backpressure를 적용하지 않는다.
ERROR 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시킨다.
DROP 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop시킨다.
LATEST 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에(나중에) emit된 데이터부터 버퍼에 채운다.
BUFFER 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop시킨다.

 

 

IGNORE 전략

Downstream에서의 backpressure 요청이 무시되기 때문에 IllegalStateException이 발생할 수 있다.

 

 

ERROR 전략

Downstream의 데이터 처리 속도가 느려서 Upstream의 emit 속도를 따라가지 못할 경우 IllegalStateException을 발생시킨다.
Publisher는 Error Signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기한다.

@Slf4j
public class Example8_2 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(1L))
            .onBackpressureError()
            .doOnNext(data -> log.info("# doOnNext: {}", data))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));

        Thread.sleep(2000L);
    }
}

1) interval()

0부터 1씩 증가한 숫자를 0.001초에 한번씩 아주 빠른 속도로 emit한다.

.interval(Duration.ofMillis(1L))

 

2) Subscriber 데이터 처리 속도 설정

Subscriber가 전달받은 데이터를 처리하는데 0.005초 시간이 걸리도록 설정한다.

.subscribe(data -> {
            try {
                Thread.sleep(5L);
            } catch (InterruptedException e) {}
            log.info("# onNext: {}", data);
        },
        error -> log.error("# onError", error));

 

3) onBackpressureError()

ERROR 전략을 사용한다. 

.onBackpressureError()

 

 

4) doOnNext()

Publisher가 emit한 데이터를 확인하거나 추가적인 동작을 정의하는 용도로 사용되는데, 주로 디버깅 용도로 사용할 수 있다.

.doOnNext(data -> log.info("# doOnNext: {}", data))
.publishOn(Schedulers.parallel())

 

5) publishOn()

Reactor Sequence 중 일부를 별도 스레드에서 실행할 수 있도록 해주는 Operator이다. 

.publishOn(Schedulers.parallel())

 

실행결과
[parallel-2] INFO - # doOnNext: 0
[parallel-2] INFO - # doOnNext: 1
[parallel-2] INFO - # doOnNext: 2
[parallel-2] INFO - # doOnNext: 3
...
[parallel-1] INFO - # onNext: 254
[parallel-1] INFO - # onNext: 255
[parallel-1] ERROR- # onError
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220)
	at reactor.core.publisher.Flux.lambda$onBackpressureError$27(Flux.java:6739)
	at reactor.core.publisher.FluxOnBackpressureDrop$DropSubscriber.onNext(FluxOnBackpressureDrop.java:135)
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:125)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

OverflowException이 발생하면서 Sequence가 종료된다. 참고로 OverflowException은 IllegalStateException을 상속한 하위 클래스이다.

 

 

DROP 전략

Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기중인 데이터 중에서 먼저 emit된 데이터부터 Drop시키는 전략이다. Drop된 데이터는 폐기된다.

@Slf4j
public class Example8_3 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(1L))
            .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));

        Thread.sleep(2000L);
    }
}

1) DROP 전략을 사용한다.

.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))

onBackpressureDrop()은 DROP된 데이터를 파라미터로 전달받을 수 있기 때문에 Drop된 데이터가 폐기되기 전에 추가 작업을 진행할 수 있다.

 

실행결과
[parallel-1] INFO - # onNext: 0
[parallel-1] INFO - # onNext: 1
[parallel-1] INFO - # onNext: 2
...
[parallel-1] INFO - # onNext: 38
[parallel-1] INFO - # onNext: 39
[parallel-1] INFO - # dropped: 256
[parallel-1] INFO - # dropped: 257
[parallel-1] INFO - # dropped: 258
[parallel-1] INFO - # onNext: 40

첫번째 Drop 구간에서 Drop이 시작되는 데이터는 숫자 '256'이다. 

'256'부터 '258'까지의 구간동안 버퍼가 가득 차 있는 상태임을 알 수 있고, 258까지 Drop되기 때문에 Subscriber쪽에서는 숫자 40부터 전달받아 처리한다.

 

이처럼 Backpressure DROP 전략을 사용하면 버퍼가 가득 찬 상태에서는 버퍼가 비워질 때까지 데이터를 Drop한다.

 

 

LATEST 전략

Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략이다.

 

DROP 전략과의 차이

DROP 저냙은 버퍼가 가득 찰 경우 버퍼 밖에서 대기중인 데이터를 하나씩 차례대로 DROP 하면서 폐기하지만, LATEST 전략은 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨두고 나머지 데이터를 폐기한다.

 

@Slf4j
public class Example8_4 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(1L))
            .onBackpressureLatest()
            .publishOn(Schedulers.parallel())
            .subscribe(data -> {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));

        Thread.sleep(2000L);
    }
}

1) LATEST 전략을 사용한다.

.onBackpressureLatest()

 

실행결과
[parallel-1] INFO - # onNext: 0
[parallel-1] INFO - # onNext: 1
[parallel-1] INFO - # onNext: 2
...
[parallel-1] INFO - # onNext: 243
[parallel-1] INFO - # onNext: 244
[parallel-1] INFO - # onNext: 245
[parallel-1] INFO - # onNext: 246
[parallel-1] INFO - # onNext: 247
[parallel-1] INFO - # onNext: 248
[parallel-1] INFO - # onNext: 249
[parallel-1] INFO - # onNext: 250
[parallel-1] INFO - # onNext: 251
[parallel-1] INFO - # onNext: 252
[parallel-1] INFO - # onNext: 253
[parallel-1] INFO - # onNext: 254
[parallel-1] INFO - # onNext: 255
[parallel-1] INFO - # onNext: 1286

Subscriber가 숫자 '255' 출력후 바로 '1286'을 출력하고있다. 이는 버퍼가 가득 찼다가 버퍼가 다시 비워지는 시간동안 emit되는 데이터가 가장 최근에 emit된 데이터가 된 후, 다음 데이터가 emit되면 다시 폐기되는 과정을 반복하기 때문이다.

 

 

BUFFER 전략

컴퓨터 시스템에서의 버퍼

입출력을 수행하는 장치들간의 속도 차이를 조절하기 위해 입출력 장치 중간에 위치해서 데이터를 어느정도 쌓아 두었다가 전송하는것

 

Backpressure BUFFER 전략도 이와 비슷하다. Backpressure BUFFER 전략은 아래의 전략들을 제공한다.

  • 버퍼의 데이터를 폐기하지 않고 버퍼링을 하는 전략
  • 버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략
  • 버퍼가 가득 차면 에러를 발생시키는 전략

 

버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략

위에서 공부한 DROP 전략과 LATEST 전략을 버퍼 바깥쪽의 데이터를 폐기했다면, BUFFER 전략은 BUFFER 안에 있는 데이터를 폐기한다. 

 

DROP_LATEST 전략

Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 Drop하여 폐끼한 후, 이렇게 확보된 공간에 emit된 데이터를 채우는 전략이다.

@Slf4j
public class Example8_5 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(300L))
            .doOnNext(data -> log.info("# emitted by original Flux: {}", data))
            .onBackpressureBuffer(2,
                    dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                    BufferOverflowStrategy.DROP_LATEST)
            .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
            .publishOn(Schedulers.parallel(), false, 1)
            .subscribe(data -> {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));

        Thread.sleep(2500L);
    }
}

1) onBackpressureBuffer()

.onBackpressureBuffer(2,
                    dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                    BufferOverflowStrategy.DROP_LATEST)
  • 첫번째 파라미터 : 버퍼의 최대 용량을 2로 설정
  • 두번째 파라미터 : 버퍼 오버플로가 발생했을때, DROP되는 데이터를 전달받아 후처리 수행
  • 세번재 파라미터 : 적용할 Backpressure 전략 설정

 

실행결과
[main] DEBUG- Using Slf4j logging framework
[parallel-2] INFO - # emitted by original Flux: 0
[parallel-2] INFO - [ # emitted by Buffer: 0 ]
[parallel-2] INFO - # emitted by original Flux: 1
[parallel-2] INFO - # emitted by original Flux: 2
[parallel-2] INFO - # emitted by original Flux: 3
[parallel-2] INFO - ** Overflow & Dropped: 3 **
[parallel-1] INFO - # onNext: 0
[parallel-1] INFO - [ # emitted by Buffer: 1 ]
[parallel-2] INFO - # emitted by original Flux: 4
[parallel-2] INFO - # emitted by original Flux: 5
[parallel-2] INFO - ** Overflow & Dropped: 5 **
[parallel-2] INFO - # emitted by original Flux: 6
[parallel-2] INFO - ** Overflow & Dropped: 6 **
[parallel-1] INFO - # onNext: 1
[parallel-1] INFO - [ # emitted by Buffer: 2 ]
[parallel-2] INFO - # emitted by original Flux: 7
  • 숫자 0이 emit되고, 버퍼에 잠시 채워진 다음 버퍼에서 다시 emit된다.
  • 원본 Flux가 emit한 숫자 0을 Subscriber가 처리하기까지 1초 정도의 시간이 걸린다.
  • Subscriber가 숫자 0을 처리하는 1초의 시간동안 원본 Flux에서는 0.3초에 한번씩 숫자 1, 2를 emit한다. 버퍼의 최대 용량이 2이기 때문에 버퍼에는 1, 2가 채워진다.
    • 버퍼 : [2, 1]
  • 0.3초 뒤에 원본 Flux에서 숫자 3을 emit하는데, 버퍼 안에 3이 채워지는 순간, 버퍼가 이미 꽉 차있으므로 숫자 3은 Drop된다.
  • Subscriber가 숫자 1을 처리하는 1초의 시간동안 원본 Flux에서는 숫자 4를 emit하여 버퍼에 채워진다.
    • 버퍼 : [4,2]

위와 같은 단계로 버퍼가 꽉 차있으면 버퍼 안에 제일 최근에(나중에) 채워진 숫자가 DROP된다.

 

 

DROP_OLDEST 전략

Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략이다. 

 

@Slf4j
public class Example8_6 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(300L))
            .doOnNext(data -> log.info("# emitted by original Flux: {}", data))
            .onBackpressureBuffer(2,
                    dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                    BufferOverflowStrategy.DROP_OLDEST)
            .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
            .publishOn(Schedulers.parallel(), false, 1)
            .subscribe(data -> {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));

        Thread.sleep(2500L);
    }
}

 1) onBackpressureBuffer()

.onBackpressureBuffer(2,
                    dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                    BufferOverflowStrategy.DROP_OLDEST)
  • 첫번째 파라미터 : 버퍼의 최대 용량을 2로 설정
  • 두번째 파라미터 : 버퍼 오버플로가 발생했을때, DROP되는 데이터를 전달받아 후처리 수행
  • 세번재 파라미터 : 적용할 Backpressure 전략 설정

 

실행결과
[parallel-2] INFO - # emitted by original Flux: 0
[parallel-2] INFO - [ # emitted by Buffer: 0 ]
[parallel-2] INFO - # emitted by original Flux: 1
[parallel-2] INFO - # emitted by original Flux: 2
[parallel-2] INFO - # emitted by original Flux: 3
[parallel-2] INFO - ** Overflow & Dropped: 1 **
[parallel-1] INFO - # onNext: 0
[parallel-1] INFO - [ # emitted by Buffer: 2 ]
[parallel-2] INFO - # emitted by original Flux: 4
[parallel-2] INFO - # emitted by original Flux: 5
[parallel-2] INFO - ** Overflow & Dropped: 3 **
[parallel-2] INFO - # emitted by original Flux: 6
[parallel-2] INFO - ** Overflow & Dropped: 4 **
[parallel-1] INFO - # onNext: 2
[parallel-1] INFO - [ # emitted by Buffer: 5 ]
[parallel-2] INFO - # emitted by original Flux: 7
  • 원본 Flux에서 숫자 0이 emit되고, 버퍼에 잠시 채워진 다음 버퍼에서 다시 emit된다. 
  • 원본 Flux가 emit한 숫자 0을 Subscriber가 처리하기까지 1초 정도의 시간이 걸린다.
  • Subscriber가 숫자 0을 처리하는 1초의 시간동안 원본 Flux에는 0.3초에 한번씩 숫자 1, 2를 emit한다. 버퍼의 최대 용량이 2이므로 이 시점에 버퍼에는 1,2가 채워진다.
    • 버퍼 : [2, 1]
  • 0.3초 뒤에 원본 Flux에서 숫자 3을 emit한다. 버퍼 안에 3이 채워지는 순간, 버퍼는 이미 꽉 차있기 때문에 버퍼 오버플로가 발생하고 버퍼 안에 있는 데이터 중에서 가장 오래된 데이터인 숫자 1이 Drop된다. 
    • 버퍼 : [3, 2]
  • 버퍼에서 숫자 2가 emit된다. 
    • 버퍼 : [3]

위와 같은 단계로 버퍼가 꽉 차있으면 버퍼 안에 제일 오래된(먼저) 채워진 숫자가 DROP된다.

 

 

반응형

Designed by JB FACTORY