[리액티브 프로그래밍] Reactor의 Context

반응형
728x90
반응형

Context

the situation, events, or onformation that are related to something and that help you understand it

'어떤 것을 이해하는데 도움이 될만한 관련 정보나 이벤트, 상황'로 해석된다. 즉, Context는 어떠한 상황에서 그 상황을 처리하기 위해 필요한 정보라고 볼 수 있다.

 

 

프로그래밍 세계에서의 Context 예시

1) ServletContext는 Servlet이 Setvlet Container와 통신하기 위해서 필요한 정보를 제공하는 인터페이스다.

2) Spring Framework에서 ApplicationContext는 애프리케이션의 정보를 제공하는 인터페이스다.

3) Spring Security에서 SecurityContextHolder는 SecurityContext를 관리하는 주체인데, 여기서 SecurityContext는 애플리케이션 사용자의 인증 정보를 제공하는 인터페이스다.

 

프로그래밍 세계에서의 Context는 어떠한 상황을 처리하거나 해결하기 위해 필요한 정보를 제공하는 어떤 것(Java에서는 인터페이스 또는 클래스)으로 이해할 수 있다.

 

 

Reactor에서의 Context

A key/value store that is propagated between components such as operators via the context protocol

즉, Reactor의 Context는 Operator 같은 Reactor 구성요소 간에 전파되는 key/value 형태의 저장소라고 정의한다.

 

전파

Downstream에서 Upstream으로 Context가 전파되어 Operator 체인상의 각 Operator가 해당 Context의 정보를 동일하게 이용할 수 있음을 의미

 

Reactor의 Context는 Subscriber와 매핑된다. 즉, 구독이 발생할때마다 해당 구독과 연결된 하나의 Context가 생긴다.

 

 

Context 예제

@Slf4j
public class Example11_1 {
    public static void main(String[] args) throws InterruptedException {
        Mono
            .deferContextual(ctx ->
                Mono
                    .just("Hello" + " " + ctx.get("firstName"))
                    .doOnNext(data -> log.info("# just doOnNext : {}", data))
            )
            .subscribeOn(Schedulers.boundedElastic())
            .publishOn(Schedulers.parallel())
            .transformDeferredContextual(
                    (mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
            )
            .contextWrite(context -> context.put("lastName", "Jobs"))
            .contextWrite(context -> context.put("firstName", "Steve"))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}

1) contextWrite()

contextWrite() 파라미터로 람다 표현식을 전달한다. 

.contextWrite(context -> context.put("lastName", "Jobs"))
.contextWrite(context -> context.put("firstName", "Steve"))

 

위 ContextWrite() 내부를 보자.

Mono.java
...
public final Mono<T> contextWrite(Function<Context, Context> contextModifier) {
    return onAssembly(new MonoContextWrite<>(this, contextModifier));
}
...

contextWrite() Operator의 파라미터를 Function 타입의 함수형 인터페이스인데, 람다 표현식으로 표현할 경우 람다 파라미터 타입이 Context이고, 리턴 값도 Context이다.

 

2) Context의 API 중 put()

실제로 데이터를 쓰는 동작이다. 이제 구독이 발생하면 Context에는 "Steve"와 "Jobs"라는 두개의 데이터가 저장될 것이다.

.contextWrite(context -> context.put("lastName", "Jobs"))
.contextWrite(context -> context.put("firstName", "Steve"))

 

3) Context에 쓰인 데이터 읽기

  • 첫번째 방법. 원본 데이터 소스 레벨에서 읽는 방식
  • 두번째 방법. Operator 체인의 중간에서 읽는 방식

아래 코드는 첫번째 방식을 사용한다. 원본 데이터 소스 레벨에서 Context에 쓰인 데이터를 읽기 위해서 deferContextual() Operator을 사용해야한다.

.deferContextual(ctx ->
    Mono
        .just("Hello" + " " + ctx.get("firstName"))
        .doOnNext(data -> log.info("# just doOnNext : {}", data))
)

 

위 deferContextual() 내부를 보자.

Mono.java
...
public static <T> Mono<T> deferContextual(Function<ContextView, ? extends Mono<? extends T>> contextualMonoFactory) {
    return onAssembly(new MonoDeferContextual<>(contextualMonoFactory));
}
...

람다 파라미터(ctx)는 Context 타입의 객체가 아닌 ContextView 타입의 객체다. 

Context에 데이터를 쓸 때는 Context를 사용하지만, Context에 저장된 데이터를 읽을때는 ContextView를 사용한다.

 

4) ContextView의 get()

.deferContextual(ctx ->
    Mono
        .just("Hello" + " " + ctx.get("firstName"))
        .doOnNext(data -> log.info("# just doOnNext : {}", data))
)

// 익명클래스
.deferContextual(new Function<ContextView, Mono<? extends String>>() {
         @Override
         public Mono<? extends String> apply(ContextView ctx) {
             return Mono
                     .just("Hello" + " " + ctx.get("firstName"))
                     .doOnNext(data -> log.info("# just doOnNext : {}", data));
         }
     }
)

ContextView의 get() 메서드를 통해서 Context에 저장된 'firstName' 키에 해당하는 값을 읽어온다. 

 

5) Operator 체인의 중간에서 Context 데이터 읽기

.transformDeferredContextual(
        (mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
)

'lastName' 키에 해당하는 값을 읽어온다.

 

 

실행결과
09:32:30.009 [boundedElastic-1] INFO - # just doOnNext : Hello Steve
09:32:30.017 [parallel-1] INFO - # onNext: Hello Steve Jobs

실행결과를 보면 Context에 저장된 데이터를 정상적으로 두번 읽어온다. 

그리고 subscribeOn()과 publishOn()을 사용해서 데이터를 emit하는 스레드와 데이터를 처리하는 스레드를 분리했기 때문에 Context에서 데이터를 읽어 오는 작업을 각각 다른 스레드에서 수행했다.

이처럼 Reactor에서는 Operator 체인상의 서로 다른 스레드들이 Context의 저장된 데이터에 손쉽게 접근할 수 있다.

 

 

Context 관련 API

Context API 설명
put(key, value) key/value 형태로 Context에 값을 쓴다. Context에 하나의 데이터를 쓰는 API
of(key1, value2, key2, value2, ...) key/value 형태로 Context에 여러 개의 값을 쓴다. 한번의 API 호출로 여러개의 데이터를 Context에 쓸수 있는데, 최대 5개의 데이터를 파라미터로 입력할 수 있다.

6개 이상의 데이터를 쓰기 위해서는 아래의 putAll()을 쓰면된다.
putAll(ContextView) 현재 Context와 파라미터로 입력된 ContextView를 merge 한다. Context의 데이터와 파라미터로 입력된 ContextView의 데이터를 합친 후, 새로운 Context를 생성한다.
delete(key) Context에서 key에 해당하는 value를 삭제한다. key에 해당되는 데이터를 삭제한다.

 

예제코드

@Slf4j
public class Example11_3 {
    public static void main(String[] args) throws InterruptedException {
        final String key1 = "company";
        final String key2 = "firstName";
        final String key3 = "lastName";

        Mono
            .deferContextual(ctx ->
                    Mono.just(ctx.get(key1) + ", " + ctx.get(key2) + " " + ctx.get(key3))
            )
            .publishOn(Schedulers.parallel())
            .contextWrite(context ->
                    context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
            )
            .contextWrite(context -> context.put(key1, "Apple"))
            .subscribe(data -> log.info("# onNext: {}" , data));

        Thread.sleep(100L);
    }
}

1) 1개의 데이터를 쓴다.

.contextWrite(context -> context.put(key1, "Apple"))

 

2) 나머지 2개의 데이터를 쓴다.

.contextWrite(context ->
        context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
)

 

3) Context.of()를 사용해서 putAll() 파라미터로 전달한다.

context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())

 

4) readOnly()

context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())

 

▶ Context.putAll()

default Context putAll(ContextView other) {
    if (other.isEmpty()) return this;

    ...
    return newContext;
}

 

▶ Context.of()

static Context of(Object key1, Object value1,
        Object key2, Object value2) {
    return new Context2(key1, value1, key2, value2);
}

 

 

putAll()의 파라미터는 contextView 객체여야 하는데, Context.of()의 리턴 값은 새로운 Context 객체다. 

따라서 이 Context 객체를 ContextView 객체로 변환해 주어야 하는데, 이 작업은 readOnly() API를 통해 이루어진다. readOnly()는 Context를 읽기 작업만 가능한 ContextView로 변환해주는 API다.

 

 

실행결과
09:50:28.113 [parallel-1] INFO - # onNext: Apple, Steve Jobs

 

 

ContextView 관련 API

Context에 저장된 데이터를 읽으려면 ContextView API를 사용해야한다. 

ContextView API 설명
get(key) ContextView에서 key에 해당하는 value를 반환한다.
getOrEmpty(key) ContextView에서 key에 해당하는 value를 Optional로 래핑해서 반환한다.
getOrDefault(key, default value) ContextView에서 key에 해당하는 value를 가져온다.
key에 해당하는 value가 없으면 default value를 가져온다.
hasKey(key) ContextView에서 특정 key가 존재하는지를 확인한다.
isEmpty() Context가 비어있는지 확인한다.
size() Context 내에 있는 key/value의 개수를 반환한다.

 

예제코드

@Slf4j
public class Example11_4 {
    public static void main(String[] args) throws InterruptedException {
        final String key1 = "company";
        final String key2 = "firstName";
        final String key3 = "lastName";

        Mono
            .deferContextual(ctx ->
                    Mono.just(ctx.get(key1) + ", " +
                            ctx.getOrEmpty(key2).orElse("no firstName") + " " +
                            ctx.getOrDefault(key3, "no lastName"))
            )
            .publishOn(Schedulers.parallel())
            .contextWrite(context -> context.put(key1, "Apple"))
            .subscribe(data -> log.info("# onNext: {}" , data));

        Thread.sleep(100L);
    }
}

1) 데이터 쓰기

.contextWrite(context -> context.put(key1, "Apple"))

 

2) 데이터 읽기

.deferContextual(ctx ->
        Mono.just(ctx.get(key1) + ", " +
                ctx.getOrEmpty(key2).orElse("no firstName") + " " +
                ctx.getOrDefault(key3, "no lastName"))
)

3개의 키에 해당하는 데이터를 읽는데, 'company' 키에 해당하는 데이터는 Context에 저장되었지만, 그 외는 모두 Context에 저장되지 않았다. 따라서 값이 있는 경우를 제외하고 모두 디폴트 값을 전달받는다.

10:40:25.574 [parallel-1] INFO - # onNext: Apple, no firstName no lastName

 

 

Context의 특징

Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다.

@Slf4j
public class Example11_5 {
    public static void main(String[] args) throws InterruptedException {
        final String key1 = "company";

        Mono<String> mono = Mono.deferContextual(ctx ->
                        Mono.just("Company: " + " " + ctx.get(key1))
                )
                .publishOn(Schedulers.parallel());


        mono.contextWrite(context -> context.put(key1, "Apple"))
                .subscribe(data -> log.info("# subscribe1 onNext: {}", data));

        mono.contextWrite(context -> context.put(key1, "Microsoft"))
                .subscribe(data -> log.info("# subscribe2 onNext: {}", data));

        Thread.sleep(100L);
    }
}

1) 첫번째 구독이 발생

Context에 'Apple'이라는 회사명이 저장된다.

mono.contextWrite(context -> context.put(key1, "Apple"))
       .subscribe(data -> log.info("# subscribe1 onNext: {}", data));

 

2) 두번째 구독이 발생

Context에 'Microsoft'라는 회사명이 저장된다.

mono.contextWrite(context -> context.put(key1, "Microsoft"))
       .subscribe(data -> log.info("# subscribe2 onNext: {}", data));

 

위 두개의 데이터가 하나의 Context에 저장될 것 같지만, Context는 구독별로 연결되는 특징이 있기 때문에 구독이 발생할 때마다 해당하는 하나의 Context가 하나의 구독에 연결된다.

 

실행결과
10:44:17.536 [parallel-2] INFO - # subscribe2 onNext: Company:  Microsoft
10:44:17.536 [parallel-1] INFO - # subscribe1 onNext: Company:  Apple

 

 

 

Context는 Operator 체인의 아래에서 위로 전파된다.

동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어쓴다. Context에 2개의 데이터 저장해보자.

@Slf4j
public class Example11_6 {
    public static void main(String[] args) throws InterruptedException {
        String key1 = "company";
        String key2 = "name";

        Mono
            .deferContextual(ctx ->
                Mono.just(ctx.get(key1))
            )
            .publishOn(Schedulers.parallel())
            .contextWrite(context -> context.put(key2, "Bill"))
            .transformDeferredContextual((mono, ctx) ->
                    mono.map(data -> data + ", " + ctx.getOrDefault(key2, "Steve"))
            )
            .contextWrite(context -> context.put(key1, "Apple"))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}

1) 'Apple' 이라는 회사명을 저장한다.

.contextWrite(context -> context.put(key1, "Apple"))

 

2) 'Bill' 이라는 이름을 저장한다.

.contextWrite(context -> context.put(key2, "Bill"))

 

3) ContextView 객체를 통해 'company' 키에 해당하는 값을 Downstream으로 emit한다.

.deferContextual(ctx ->
    Mono.just(ctx.get(key1))
)

 

실행결과
10:58:38.856 [parallel-1] INFO - # onNext: Apple, Steve

Context에 분명 'Bill'을 저장했는데, 'Steve'가 출력되었다.

이 이유는 Operator 체인상의 아래에서 위로 전파되는 특징이 있기 때문이다.

 

다시 위의 코드의 흐름을 보자.

1) 'Apple' 이라는 회사명을 저장한다.

.contextWrite(context -> context.put(key1, "Apple"))

 

2) 그다음 아래 코드가 실행되는데, 해당 시점에는 'name' 키에 해당하는 값이 Context에 없다.

.transformDeferredContextual((mono, ctx) ->
        mono.map(data -> data + ", " + ctx.getOrDefault(key2, "Steve"))
)

따라서 getOrDefault() API의 디폴트 값으로 지정한 "Steve"가 Subscriber에게 전달된 것이다. 이때 만약 getOrDefault()가 아닌 get()이였다면 NoSuchElementException이 발생하게된다. 모든 Operator에서 Context에 저장된 데이터를 읽을 수 있도록 contextWrite()을 Operator 체인의 맨 마지막에 둬야한다. 

 

 

Inner Sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있다.

Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없다.

@Slf4j
public class Example11_7 {
    public static void main(String[] args) throws InterruptedException {
        String key1 = "company";
        Mono
            .just("Steve")
//            .transformDeferredContextual((stringMono, ctx) ->
//                    ctx.get("role"))
            .flatMap(name ->
                Mono.deferContextual(ctx ->
                    Mono
                        .just(ctx.get(key1) + ", " + name)
                        .transformDeferredContextual((mono, innerCtx) ->
                                mono.map(data -> data + ", " + innerCtx.get("role"))
                        )
                        .contextWrite(context -> context.put("role", "CEO"))
                )
            )
            .publishOn(Schedulers.parallel())
            .contextWrite(context -> context.put(key1, "Apple"))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}

1) Opertator 체인의 제일 마지막에서 Context에 값을 쓰고있다.

.contextWrite(context -> context.put(key1, "Apple"))

 

2) flatMap()이라는 Operator 내부에 존재하는 Operator 체인에서 값을 쓰고있다.

.flatMap(name ->
    Mono.deferContextual(ctx ->
        Mono
            .just(ctx.get(key1) + ", " + name)
//            .transformDeferredContextual((mono, innerCtx) ->
//                    mono.map(data -> data + ", " + innerCtx.get("role"))
            )
            .contextWrite(context -> context.put("role", "CEO"))
    )
)

여기서 flatMap() Operator 내부에 있는 Sequence를 Inner Sequence라고 하는데, Inner Sequence에서는 Inner Sequence 바깥쪽 Sequence에 연결된 Context의 값을 읽을 수 있다.

 

실행결과
11:12:56.321 [parallel-1] INFO - # onNext: Apple, Steve, CEO

바깥쪽 Sequence에 연결된 Context에 쓴 값인 'Apple'을 Inner Sequence에서 읽을 수 있다.

 

 

아래 부분의 주석을 해제해보자.

.transformDeferredContextual((stringMono, ctx) ->
                    ctx.get("role"))

 

전체 예제코드
@Slf4j
public class Example11_7 {
    public static void main(String[] args) throws InterruptedException {
        String key1 = "company";
        Mono
            .just("Steve")
            .transformDeferredContextual((stringMono, ctx) ->
                    ctx.get("role"))
            .flatMap(name ->
                Mono.deferContextual(ctx ->
                    Mono
                        .just(ctx.get(key1) + ", " + name)
                        .transformDeferredContextual((mono, innerCtx) ->
                                mono.map(data -> data + ", " + innerCtx.get("role"))
                        )
                        .contextWrite(context -> context.put("role", "CEO"))
                )
            )
            .publishOn(Schedulers.parallel())
            .contextWrite(context -> context.put(key1, "Apple"))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}

 

실행결과 - 오류 발생
Caused by: java.util.NoSuchElementException: Context does not contain key: role

Context에 'role'이라는 키가 없기 때문에 NoSuchElementExcpetion이 발생했다. Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없음을 알 수 있다.

 

 

 

Context 활용 예제

인증된 도서 관리자가 신규 도서를 등록하기 위해 도서 정보와 인증 토큰을 서버로 전송하는 예제다.

@Slf4j
public class Example11_8 {
    public static final String HEADER_AUTH_TOKEN = "authToken";
    public static void main(String[] args) {
        Mono<String> mono =
                postBook(Mono.just(
                        new Book("abcd-1111-3533-2809"
                                , "Reactor's Bible"
                                ,"Kevin"))
                )
                .contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGciOi"));

        mono.subscribe(data -> log.info("# onNext: {}", data));

    }

    private static Mono<String> postBook(Mono<Book> book) {
        return Mono
                .zip(book,
                        Mono
                            .deferContextual(ctx ->
                                    Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
                )
                .flatMap(tuple -> {
                    String response = "POST the book(" + tuple.getT1().getBookName() +
                            "," + tuple.getT1().getAuthor() + ") with token: " +
                            tuple.getT2();
                    return Mono.just(response); // HTTP POST 전송을 했다고 가정
                });
    }
}

@AllArgsConstructor
@Data
class Book {
    private String isbn;
    private String bookName;
    private String author;
}

1) 도서정보 Book 전송

Mono<Book> 객체를 postBook() 메서드의 파라미터로 전달한다.

 postBook(Mono.just(
                new Book("abcd-1111-3533-2809"
                        , "Reactor's Bible"
                        ,"Kevin"))
        )

 

2) zip() Operator

Mono<Book> 객체와 인증 토큰 정보를 의미하는 Mono<String> 객체를 하나의 Mono로 합친다.

이때 합쳐진 Mono는 Mono<Tuple2>의 객체가 된다.

.zip(book,
        Mono
            .deferContextual(ctx ->
                    Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
)

 

3) flatMap() Operator

도서 정보를 전송한다.

.flatMap(tuple -> {
    String response = "POST the book(" + tuple.getT1().getBookName() +
            "," + tuple.getT1().getAuthor() + ") with token: " +
            tuple.getT2();
    return Mono.just(response); // HTTP POST 전송을 했다고 가정
});

 

핵심 로직

1) 위 코드에서 Context에 저장한 인증 토큰을 2개의 Mono를 합치는 과정에서 다시 Context로부터 읽어와서 사용한다.

.contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGciOi"));

...

.zip(book,
        Mono
            .deferContextual(ctx ->
                    Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
)

 

2) Context 접근

 Mono<String> mono =
        postBook(Mono.just(
                new Book("abcd-1111-3533-2809"
                        , "Reactor's Bible"
                        ,"Kevin"))
        )
        .contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGciOi"));

mono.subscribe(data -> log.info("# onNext: {}", data));

Mono가 어떤 과정을 거치든 상관없이 가장 마지막에 리턴된 Mono를 구독하기 직전에 contextWrite()으로 데이터를 저장하기 때문에 Operator 체인의 위쪽으로 전파되고, Operator 체인 어느 위치에서든 Context에 접근할 수 있다.

 

결론적으로, Context는 인증 정보 같은 직교성(독립성)을 가지는 정보를 전송하는데 적합하다.

 

 

반응형

Designed by JB FACTORY