[리액티브 프로그래밍] Cold Sequence vs Hot Sequence

반응형
728x90
반응형

Cold와 Hot의 의미

지난 RxJava 공부 시기에 위 개념을 공부했었는데 다시보면 기억하기가 쉽지 않다. 리액티브 프로그래밍을 공부하면서 다시 나온 개념을 기억하고자 포스팅한다.

 

Cold와 Hot의 의미를 모르는 사람은 없을 것이다. 대표적인 예시로 Hot Swapt이나 Hot Deploy를 보자.

 

Hot Swap

컴퓨터 시스템의 전원이 켜져 있는 상태에서 디스크 등의 장치를 교체할 경우 시스템을 재시작하지 않고서도 바로 장치를 인식한다.

 

Hot Deploy

서버를 재시작하지 않고서 응용 프로그램의 변경 사항을 적용한다.

 

공통점을 찾아보자. Hot은 무언가 처음부터 다시 시작하지 않고, 같은 작업이 반복하지 않는 느낌이다. 이와 반대인 Cold는 처음부터 새로 시작해야하고, 새로 시작하기 때문에 같은 작업이 반복된다.

 

리액티브 프로그래밍에서의 Cold, Hot의 의미를 위의 내용을 바탕으로 쉽게 이해해보자. 다시 쉽게 말하자면 'Cold'는 무언가를 새로 시작하고, 'Hot'은 무언가를 새로 시작하지 않는다.

 

 

Sequence

Publisher가 emit하는 데이터의 연속적인 흐름을 정의해놓은 것이다. 코드로 표현하면 Operator 체인 형태로 정의된다.

 

 

Cold Sequence

Subscriber가 구독할 때마다 데이터의 흐흠이 처음부터 시작되는 Sequence를 말한다.

https://velog.io/@korea3611/%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88Reactive-Streams%EB%9E%80

위 그림으로 이해해보자. 위에 있는 Subscriber A가 구독을 하면 Publisher는 4개(1, 3, 5, 7) 데이터를 emit한다. 

그리고 아래에 있는 Subscriber B가 구독을 해도 Publisher는 4개(1, 3, 5, 7) 데이터를 emit 한다. 

 

결론적으로 Subscriber A와 Subscriber B의 구독 시점이 다르지만 Subscriber A, B는 모두 동일한 데이터를 받게된다.

 

즉, Cold Sequence란, 구독 시점이 달라도 구독을 할 때마다 Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름을 말한다.

 

예제코드
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.util.Arrays;

@Slf4j
public class Example7_1 {
    public static void main(String[] args) throws InterruptedException {

        Flux<String> coldFlux =
                Flux
                    .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
                    .map(String::toLowerCase);

        coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
        System.out.println("----------------------------------------------------------------------");

        Thread.sleep(2000L);

        coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
    }
}

1) fromIterable() Operator를 사용하여 List로 전달받은 데이터 소스를 emit한다.

.fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))

 

실행결과
23:07:05.739 [main] INFO - # Subscriber1: korea
23:07:05.740 [main] INFO - # Subscriber1: japan
23:07:05.740 [main] INFO - # Subscriber1: chinese
----------------------------------------------------------------------
23:07:07.746 [main] INFO - # Subscriber2: korea
23:07:07.749 [main] INFO - # Subscriber2: japan
23:07:07.750 [main] INFO - # Subscriber2: chinese

예상한대로, 구독이 발생할때마다 emit된 데이터(korea, japan, chinese)를 처음부터 다시 전달한다.

 

 

Hot Sequence

Cold Sequence는 위의 설명으로 충분히 이해할 수 있을것이다. Hot Sequence는 Cold Sequence의 반대로 개념으로 이해하면 된다. Hot Sequence의 경우 구독이 발생한 시점 이전에 Publisher로부터 emit된 데이터는 Subscriber가 전달받지 못하고, 구독이 발생한 시점 이후에 emit된 데이터만 받을 수 있다.

 

https://velog.io/@korea3611/%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88Reactive-Streams%EB%9E%80

총 3번의 구독이 발생했다. 첫번째 구독한 Subscriber A, 두번째 구독한 Subscriber B, 세번째 구독한 Subscriber C는 각각 다른 데이터를 받는다. 즉, 구독이 아무리 많이 발생해도 Publisher가 데이터를 처음부터 emit하지 않는다는 것을 의미한다.

 

Subscriber A
1, 3, 5, 7

Subscriber B
5, 7

Subscriber C
7

 

위와 같이 총 3번의 구독으로 인해 데이터를 받게된다. Hot Sequence의 개념을 다시 정리해보면, Publisher가 데이터를 emit하는 과정이 한번만 일어나고, Subscriber가 각각의 구독 시점 이후에 emit된 데이터만 전달받는 데이터의 흐름을 말한다.

 

예제코드
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;

@Slf4j
public class Example7_2 {
    public static void main(String[] args) throws InterruptedException {
        String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};

        log.info("# Begin concert:");
        Flux<String> concertFlux =
                Flux
                    .fromArray(singers)
                    .delayElements(Duration.ofSeconds(1))
                    .share();

        concertFlux.subscribe(
                singer -> log.info("# Subscriber1 is watching {}'s song", singer)
        );

        Thread.sleep(2500);

        concertFlux.subscribe(
                singer -> log.info("# Subscriber2 is watching {}'s song", singer)
        );

        Thread.sleep(3000);
    }
}

1) delayElements()

데이터 소스로 입력된 각 데이터의 emit을 일정시간 동안 지연시키는 Operator이다. 데이터의 emit이 1초씩 지연될 수 있도록한다.

디폴트 스레드 스케줄러가 parallel 이기 때문에 위 예제코드는 main 스레드, parallel-1, parallel-2, parallel-3, parallel-4, parallel-5 스레드가 실행된다.

.delayElements(Duration.ofSeconds(1))

 

2) share()

Cold Sequence를 Hot Sequence로 동작하게 해주는 Operator이다. 

원본 Flux를 멀티캐스트(공유)하는 새로운 Flux를 리턴한다는 뜻으로 이해하면 된다. 

.share();

아래 3개의 메서드는 결국 Flux 객체를 모두 리턴하는데, 처음 fromArray()에서 처음으로 리턴하는 Flux가 '원본 Flux'가 된다.

Flux
    .fromArray(singers)
    .delayElements(Duration.ofSeconds(1))
    .share();

이 원본 Flux를 멀티캐스트(공유)한다는 의미는 여러 Subscriber가 하나의 원본 Flux를 공유한다는 의미이다. 그러므로 하나의 원본 Flux를 공유해서 다같이 사용하기 때문에 Hot Sequence가 되는 것이다.

 

어떤 Subscriber가 이 원본 Flux를 먼저 구독하면 데이터 emit을 시작하게되고, 그 이후에 다른 Subscriber가 구독하는 시점에는 원본 Flux에서 이미 emit된 데이터를 전달받을 수 없게된다.

 

실행결과
23:12:24.717 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
23:12:25.722 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
23:12:26.729 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
23:12:26.729 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
23:12:27.733 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
23:12:27.733 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
23:12:28.738 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
23:12:28.739 [parallel-5] INFO - # Subscriber2 is watching Singer E's song

두번째 구독의 경우, 원본 Flux가 emit한 데이터 중에서 Singer A, Singer B는 전달받지 못했다. 

 

 

HTTP 요청과 응답에서의 Cold Sequence

예제코드
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

import java.net.URI;

@Slf4j
public class Example7_3 {
    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        Mono<String> mono = getWorldTime(worldTimeUri);
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

        Thread.sleep(2000);
    }

    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
}

1) getWorldTime() 메서드 호출

리턴 값으로 Mono를 전달받는다. 

Mono<String> mono = getWorldTime(worldTimeUri);

 

2) 총 2번의 구독이 일어난다.

mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
Thread.sleep(2000);

mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
Thread.sleep(2000);

첫번째 구독이 발생하고, 2초의 지연 시간 후에 두번째 구독이 발생한다.

구독이 발생할때마다 데이터의 emit 과정이 처음부터 새로 시작되는 Cold Sequence의 특징으로 인해, 두번의 구독이 발생함으로써 두번의 새로운 HTTP 요청이 발생한다.

 

 

HTTP 요청과 응답에서의 Hot Sequence

예제코드
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

import java.net.URI;

@Slf4j
public class Example7_4 {
    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        Mono<String> mono = getWorldTime(worldTimeUri).cache();
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

        Thread.sleep(2000);
    }

    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
}

1) cache() 메서드

cache() 메서드로 인해 Cold Sequence가 Hot Sequence로 동작하게 된다.

Mono<String> mono = getWorldTime(worldTimeUri).cache();

cache() Operator는 Cold Sequence로 동작하는 Mono를 Hot Sequence로 변경해주고 emit된 데이터를 캐시한 뒤, 구독이 발생할 때마다 캐시된 데이터를 전달한다. 

 

결과적으로 캐시된 데이터를 전달하기 때문에 구독이 발생할때마다 Subscriber는 동일한 데이터를 전달받게 된다.

 

 

반응형

Designed by JB FACTORY