이 글은 spring.io의 블로그에 개재된 Notes on Reactive Programming Part II: Writing Some Code의 번역글로 이전 글 다음으로 연재된 두 번째 글입니다. Java 리액티브 프로그래밍 프레임워크인 Reactor를 이용하여 리액티브 프로그램을 작성해보고, 그것이 어떤 개념을 이용하고 있으며 실제로 어떻게 동작하는지 자세히 설명하고 있습니다. 이 글을 통해 Reactor를 이용한 Java 리액티브 프로그래밍의 맛을 느껴보시기 바랍니다.

이 글을 읽기 위해 다음 지식이 필요합니다.


 

서론

 

이 글은 이전 글 다음 연재된 글로, 실제적인 코드 예제를 통해 몇몇 개념들을 이해하는 데에 집중할 것이다. 이 글을 읽고난 뒤, 리액티브 프로그래밍이 어떤 차이를 만들고, 무엇이 그것을 함수형이 될 수 있도록 하는지에 대해 좀 더 이해할 수 있게 될 것이다. 여기서 다뤄진 예제들은 꽤 추상적인데, 그래도 리액티브 API와 프로그래밍 스타일에 대해 어떻게 생각해야 하는지 알려줄 것이고, 리액티브이 다른 방식들과 비교해 어떻게 다른지에 대한 느낌을 갖게 해 줄 것이다. 이제 리액티브 프로그래밍의 여러 요소들에 대해 살펴볼 것이고, 데이터의 흐름(flow of data) 제어 방법과 필요하다면 백그라운드 스레드에서 처리하는 방법에 대해서 배우게 될 것이다.


 

프로젝트 설정 (Setting up a Project)

 

우리가 명확히 알아야 할 개념을 보여주기 위해, 여기서는 Reactor 라이브러리를 사용할 것이다. 리액티브 코드는 다른 도구를 이용해서도 쉽게 작성될 수 있을 것이다. Copy and Paste를 하지 않고 아래의 코드들을 실행해보고 싶다면 Github에 예제 코드가 등록되어 있으니 확인하길 바란다.

일단 https://start.spring.io/를 이용하여 빈 프로젝트를 하나 만들고 Reactor Core 의존성을 추가한다. Maven을 사용한다면 다음과 같다.

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.0.7.RELEASE</version><!--$NO-MVN-MAN-VER$-->
		</dependency>

Gradle의 경우에도 매우 비슷하다.

     compile 'io.projectreactor:reactor-core:3.0.7.RELEASE'

이제 실제 코드를 작성해보자.


 

무엇이 프로그램을 함수형이 되도록 하는가? (What makes it functional?)

 

함수형 프로그램의 기본 빌딩블럭은 이벤트들의 연쇄(sequence of events)와 이 이벤트 처리에 대한 두 가지 주인공인 배포자(publisher)와 구독자(subscriber)로 이루어진다. 여기서 시퀀스(sequence)를 stream이라고 말해도 - 실제로 그렇기도 하기 때문에 - 무방하다. stream이라는 단어를 써야 하는 경우 소문자 s로 stream을 표시할 것이며, 이는 Java 8에 포함되어 있는 java.util.Stream과는 다른 말이니 혼동하지 않도록 주의해야 한다. 어쨌든 지금부터 배포자와 구독자에 대한 서술에 집중할 것이다. (이것이 Reactive Stream이 실제로 일을 수행하는 방식이기도 하다.)

Reactor는 여기서 예제를 작성하기 위해 사용할 라이브러리인데, 그래서 앞으로 Reactor의 표기법을 따르게 될 것이다. Reactor에서는 배포자를 Flux라고 부른다.(이 클래스는 Reactive Stream의 Publisher 인터페이스를 구현하고 있다.)  RxJava 라이브러리에도 역시 비슷하게 대응하는 기능들이 있는데, 그 경우 Flux 대신 Observable이라고 말하겠지만 결과적으로 코드는 매우 유사할 것이다.(Reactor 2.0은 이것을 Stream이라고 불렀는데 Java 8의 Stream과 같이 얘기하게 되면 혼란을 야기할 수 있다. 그래서 여기서는 Reactor 3.0가 제공하는 새로운 코드만을 사용할 것이다.)

 

제너레이터들(생성기, Generators)

Flux는 특정 POJO 타입 이벤트의 시퀀트에 대한 배포자이며, 그래서 generic 타입이다. 즉, Flux<T>는 T 타입에 대한 배포자이다. Flux에는 다양한 데이터들로부터 자기 자신(Flux 인스턴스)을 생성하는 데 편리한 몇몇 static 메소드들이 등록되어 있다. 예를들어 배열로부터 Flux 인스턴스를 생성하는 방법은 다음과 같다.

 Flux<String> flux = Flux.just("red", "white", "blue");

이 코드로 Flux 인스턴스가 생성되었고, 이것을 이용하여 작업을 수행할 수 있다. 실제로 수행할 수 있는 작업은 두 가지 뿐이다: 조작(변형하거나 다른 시퀀스들과 합침)하거나, 구독(또다른 배포자가 이 역할을 한다.)하거나.

 

단일 값 시퀀스들(Single Valued Sequences)

가끔, 저장소(repository)에서 id를 이용하여 하나의 개체(enitty)를 조회하는 상황과 같이, 요소(element)가 하나만 있거나 전혀 없는 시퀀스를 만들어야 할 때가 있다. 이런 상황을 위해 Reactor에는 Mono라고 하는 타입이 존재한다.Mono에는 Flux와 유사한 API들이 있지만 Flux보다 집중되어 있는데, Flux의 모든 오퍼레이터(operator)들이 단일 값 시퀀스를 처리하는데 어울리는 것은 아니기 때문이다. RxJava에도 (version 1.x의) Single이 있고, 텅 빈 시퀀스를 위해 Completable이 있다. Reactor에서 텅 빈 시퀀스는 Mono<Void>로 표현한다.

 

오퍼레이터들(Operators)

Flux에는 수많은 메소드들이 있는데, 거의 대부분은 오퍼레이터(operator)이다. 여기서 그 모든 오퍼레이터에 대해 다루지는 않을 것인데, 그러기에 더 좋은 곳(javadoc과 같이) 이 있을 것이기 때문이다. 여기서는 오퍼레이터가 무엇이고 이것을 가지고 무엇을 할 수 있는지 감을 잡을 수 있을 정도로만 다룰 것이다.

예를 들어, Flux의 내부에서 발생한 이벤트들이 표준 출력(standard out)으로 로깅되기를 원한다면 .log() 메소드를 호출하면 된다. 혹은, 이 시퀀스를 변형(transform)하길 원한다면 map() 메소드를 이용하면 된다:

Flux<String> flux = Flux.just("red", "white", "blue");

Flux<String> upper = flux
  .log()
  .map(String::toUpperCase);

이 코드에서 우리는 입력된 문자열들을 각각 대문자로 변환(convert)하여 변형(transform)했다. 지금까지는 뭐, 별 거 아니다.

이 작은 예제에서 흥미로운 것은 - 이런 것에 익숙하지 않다면 좀 놀랄 수도 있는데 - 아직 어떤 데이터도 처리되지 않았다는 것이다. 이 코드를 실행해도 아무것도 로깅되지 않을 것인데, 말 그대로 아무 일도 일어나지 않았기 때문이다. (실제로 실행하고 결과를 확인해보라.) Flux 인스턴스의 오퍼레이터를 호출하는 것은 나중에 처리될 수행 계획(plan of execution)을 수립하는 것이다. 이것은 완벽히 선언적(declarative)인데, 그래서 이것을 "함수형"이라고 부르는 것이다. 오퍼레이터들 내의 구현 로직들은 데이터가 흘러가기(flow) 시작한 이후에만 실행되며, 그래서 누군가가 이 (Publisher와 동일한) Flux에 대해 구독(subscribe)할 때까지 아무 일도 일어나지 않는 것이다.

모든 리액티브 라이브러리들에는 이런 동일한 선언적 방식의, 데이터의 시퀀스를 처리하는 함수형 접근법들이 있으며, Java 8의 Streams도 그렇다. Flux 때와 동일한 기능을 Stream으로 표현하면 다음과 같은 유사한 코드가 만들어진다.

Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
    System.out.println(value);
    return value.toUpperCase();
});

이미 Flux에 대해 언급했던 내용이 여기에도 적용된다: 어떤 데이터도 처리되지 않았고, 그저 실행 계획일 뿐이라는 것. 하지만 FluxStream 사이에는 중요한 차이점이 있는데, 이 점으로 인해 리액티브 프로그래밍을 하는 데에 Stream을 이용하는 것이 적절하지 않게 된다. Flux에 훨씬 더 많은 오퍼레이터가 있고, 편리하게 많은 작업을 수행할 수 있다는 점도 있지만, 진정한 차이점은 - 바로 아래에 소개될 것인데 - 데이터를 소비(comsume)하기를 원할 때에서야 비로소 나타난다.

Sebastien Deleuze가 작성한 Reactive Types라는 블로그가 있는데, 여기에 다양한 종류의 streaming과 리액티브 API의 (정의되어 있는 타입들과 그것을 사용하는 방식을 기준으로 작성한) 차이점에 대해 잘 설명되어 있다. 그리고 FluxStream의 차이점이 보다 상세히 소개되어 있다.

 

구독자들(Subscribers)

데이터가 흘러가게(flow)하기 위해, subscribe() 메소드들 중 하나를 이용하여 Flux에 대해 구독해야 한다. 이 메소드들만이 데이터가 흘러가게 할 수 있다. subscribe 메소드들은 시퀀스에 대해 정의했던 오퍼레이터의 연쇄(chain)를 거슬러 올라가서 배포자에게 데이터의 생성을 시작할 것을 요청한다.지금까지 작업했던 예제를 예로 들면, 내재돼 있는 문자열 컬랙션이 반복처리(iterated)된다. 좀 더 복잡한 사용 예를 예로 든다면, 파일시스템으로부터 파일을 읽도록 할 수도 있고 데이터베이스로부터 조회할 수도 있으며, HTTP 서비스를 호출할 수도 있다.

subscribe() 메소드들 실제로 호출하는 예는 다음과 같다.

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe();

이에 대한 출력은 다음과 같다.

09:17:59.665 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

이 로그로부터 알 수 있는 점은, 매개변수 없는 subscribe() 메소드가 모든 데이터를 전달해줄 것을 배포자에게 요청한다는 것이다. 오직 하나의 request() 가 로깅되어 있고, "unbounded"로 지정되어 있는 것을 확인할 수 있다. 또, 배포된 각각의 아이템들을 처리하는 콜백(onNext())과, 시퀀스의 마지막을 위한 콜백(onComplete()), 그리고 원래의 구독을 위한 콜백(onSubscribe())이 있다는 것을 확인할 수 있다. 이 이벤트들에 대해 직접 리스닝을 하는 것이 필요하다면 doOn*() 메소드를 사용하면 되는데, 이들도 역시 오퍼레이터로 데이터가 흘러가도록 하지 않는다.

subscribe() 메소드는 중복정의되어 있으며, 각각 특정한 작업이 수행되도록 다양한 선택권을 준다. 가장 중요하면서도 편리한 형태는 콜백을 매개변수로 받는 subscribe() 메소드이다. 첫 번째 매개변수는 Consumer인데, 이는 각 아이템에 대한 콜백이며, 필요하다면 에러를 위한 Consumer를 선택적으로 추가할 수도 있고, 시퀀스가 종료된 후 실행되도록 하기 위해 순수한 Runnable 추가할 수도 있다. 아이템 당 콜백만을 이용하는 예제는 다음과 같다:

 Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
.subscribe(System.out::println);

이에 대한 출력은 다음과 같다.

09:56:12.680 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

또, 데이터의 흐름을 제어하고, 여러가지 방법으로 (흐름을) "묶을(bound)" 수 있다. Subscription을 위한 저 수준 API는 Subscriber를 통해 획득할 수 있다. 위와 동일한 기능을 하는, 다소 긴 subscribe() 코드는 다음과 같다.

.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }

}); 

예를 들어, 한 번에 최대 두 개의 아이템이 소비되도록 흐름을 제어하고 싶다면, Subscription을 보다 똑똑하게 사용할 수 있다.

.subscribe(new Subscriber<String>() {

    private long count = 0;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(2);
    }

    @Override
    public void onNext(String t) {
        count++;
        if (count>=2) {
            count = 0;
            subscription.request(2);
        }
     }
... 

Subscriber는 한 번에 두 개씩 아이템들을 일괄 처리한다. 이런 처리방식은 일반적이기 때문에, 편의를 위해 이런 기능을 별도의 클래스로 추출해두는 것도 고려해볼 만 하다. 이에 대한 출력은 다음과 같다.

09:47:13.562 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

사실, 이런 배치 처리 구독자는 워낙 일반적으로 사용되기 때문에 이미 Flux 내에 메소드가 추가되어 있다. 위의 배치 처리 예제는 다음과 같이 표현할 수 있다.

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe(null, 2);

(subscribe() 메소드의 매개변수로 요청 제한(request limit)이 추가되어 있는 것을 확인할 수 있다.) 이 코드의 실행 결과는 다음과 같다.

10:25:43.739 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onComplete() 

스프링 Reactive 웹과 같이, 시퀀스들을 처리해주는 라이브러리들은 구독(subscription)을 수행를 할 수 있다. 그래서 이런 류의 고려사항들은 스택의 아랫쪽으로 내릴 수록 좋은데, 그렇게 함으로써 비즈니스 로직과 관련이 없는 코드가 비즈니스 로직 코드를 지저분하게 하는 것을 막을 수 있고, 결과적으로 코드의 가독성을 높이고 테스트하거나 유지보수하기 용이하게 될 것이기 때문이다. 그래서 규칙을 하나 말한다면, 가능한 한 시퀀스에 대해 직접 구독하지 않는 것이 좋으며, 그렇지 않다면 적어도 그런 코드를 비즈니스 로직 레이어 외의 처리 레이어(processing layer)에 두는 것이 좋다.

 

스레드, 스케쥴러, 그리고 백그라운드 처리(Threads, Schedulers, and Background Processing)

위의 로그에서 흥미로운 부분은 모든 로그가 "main" 스레드에서 출력되었다는 점인데, 이는 subscribe() 메소드의 호출자가 main 스레드라는 의미이다. 이를 통해 중요한 사실을 알 수 있게 된다: Reactor는 스레드를 지독히도 절약하는데, 이는 가능한 최대의 성능을 낼 수 있도록 보장하기 위해서이다. 사실 이 말은 지난 5년간, 서비스의 성능을 조금이라도 더 쥐어짜기 위해 스레드와 스레드풀, 그리고 비동기 처리에 대해 논쟁한 적이 있는 사람에게는 매우 이상하게 들릴 것이다. 하지만 이것은 사실이다: 아무리 JVM이 여러 개의 스레드에 대한 동시 처리를 최적화할 수 있다 하더라도, 스레드들 간의 명시적인 스위칭없이, 단일 스레드 내에서 처리하는 것이 언제나 더 빠르다. Reactor는 모든 비동기 처리를 제어하는 열쇠를 사용자에게 건내주고, 사용자가 무엇을 해야할지 알고 있다고 가정한다. 

Flux는 스레드 바운더리를 제어할 수 있는 몇 개의 설정 메소드들을 제공한다. 예를 들어, Flux.subscribeOn() 메소드를 이용하여 구독이 백그라운드 스레드에서 처리되도록 설정할 수 있다:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.parallel())
.subscribe(null, 2); 

이 코드의 실행 결과는 다음과 같다.

13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()

이 코드를 직접 작성했든 아니면 Copy and Paste를 했든, JVM이 종료되기 전에 이 처리가 끝날 때 까지 기다려야(wait) 한다.

로그에서 구독 뿐만 아니라 각각의 처리들이 하나의 백그라운드 스레드(parallel-1-1)에서 발생했음을 확인할 수 있다 - 이는 우리가 코드에서 Flux에 대해 백그라운드로 구독할 것을 요청했기 때문이다. 각 아이템에 대한 처리가 CPU 집약적인 작업이라면 이렇게 해도 무방하다.(하지만 백그라운드 스레드를 사용했다고 해도, 컨텍스트 스위치에 대한 비용을 지불했지만 처리가 빨라지지는 않기 때문에 무의미한 일이라고 할 수 있다.) 또, I/O 집약적인, 그리고 아마도 블록킹 I/O를 사용하는 아이템 처리를 하고 싶을 수도 있을 것이다. 그리고 이 경우, 호출자가 대기하지 않도록 최대한 빨리 처리하고 싶을 것이다. 이 경우에도 우리의 친구 스레드풀을 이용할 수 있는데, 이 때에도 Schedulers.parallel() 메소드를 호출하면 된다. 각각의 아이템들의 처리를 (스레드풀의 제한 범위 내에서) 별도의 스레드가 수행하도록 변경하려면, 하나의 Flux를 별도의 분리된 배포자로 나누어야 하고, 각각의 배포자들이 각각의 백그라운드 스레드에서 수행되도록 설정해야 한다. 이를 위해 flatMap() 이라는 오퍼레이터를 이용할 수 있는데, 이것은 각 아이템들을 하나의 (다른 타입일 수도 있는) Publisher로 변경하고, 그 새로운 타입의 시퀀스로 돌아간다:

Flux.just("red", "white", "blue")
  .log()
  .flatMap(value ->
     Mono.just(value.toUpperCase())
       .subscribeOn(Schedulers.parallel()),
     2)
.subscribe(value -> {
  log.info("Consumed: " + value);
})

여기서 알아야 하는 점은, flatMap()을 사용하면 아이템들은 "자식" 배포자로 내려보내지고, 이렇게 되면 전체 시퀀스가 아니라 각각의 아이템들에 대해 구독을 제어할 수 있다는 것이다. Reactor는 가능한 단 하나의 스레드만 사용하여 행동하도록 제작되어 있기 때문에, 백그라운드 스레드에서 하나 혹은 한 그룹의 아이템을 처리하도록 하기 위해 명시적으로 설정해야 한다. 사실, 여기서 사용한 예제는 병렬 처리를 강제하는 몇몇 잘 알려진 트릭 중 하나이다. (이에 대한 상세한 내용은 Reactive Gems를 참고하길 바란다.)

이 코드를 실행하면 다음과 같은 결과를 얻을 수 있다.

15:24:36.596 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE 

여기서 알 수 있는 점은, 각 아이템들을 소비하기 위해 여러 개의 스레드가 사용되고 있다는 것, 그리고 flatMap()이 사용 가능한 스레드가 있기만 하다면 한 번에 두 개까지 동시에 처리될 수 있도록 한다는 것이다. request(1)이라고 되어 있는 것을 볼 수 있는데, 이는 시스템이 파이프라인 내에 두 개의 아이템들을 유지하려고 하고 이 두 개의 아이템들이 동시에 종료되지 않기 때문이다. 여기서 Reactor가 매우 똑똑하게 동작하는데, Reactor는 구독자가 대기하는 시간을 최소화하기 위해 상위 Publisher로부터 아이템들을 미리 가져(pre-fetch)온다.(여기서는 그것을 확인할 수는 없는데 아이템의 개수가 세 개 밖에 되지 않기 때문이다.)

세 개의 아이템("red", "white", "blue")은 하나 이상의 스레드가 동작하는 것을 확인하기에 너무 적은 수이므로 데이터를 더 많이 생성해 넣는 것이 좋을 수 있다. 예를 들어 난수 생성기를 이용하여 테스트해 볼 수 있을 것이다.

Flux에는 또, publishOn() 이라는 메소드가 있는데, 이는 구독자 자신을 위한 것이 아니라 리스너를 위한 메소드이다. (즉, onNext() 혹은 소비자를 위한 콜백이다.) :

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.newParallel("sub"))
  .publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
    log.info("Consumed: " + value);
}); 

이 결과는 다음과 같다.

15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE 

여기서 소비자의 콜백("Consumed:..." 로그)가 pub-1-1이라는 구독자용 스레드에서 실행된 것을 확인할 수 있다. 만약 이 코드에서 subscribeOn()을 제거한다면 데이터의 두 번째 덩어리(chunk)들도 pub-1-1 스레드에서 처리되는 것을 확인할 수 있을 것이다. 다시 한 번 얘기하지만, Reactor는 스레드를 절약하는 경향이 있기 때문에, 명시적으로 스레드를 스위치해 달라는 요청이 없다면 현재 사용하고 있는 스레드를 지속적으로 사용하게 된다.

subscribe(null, 2)로 되어 있는 코드를 publishOn() 메소드에 prefetch=2 매개변수를 입력하는 것으로 바꿀 수 있다. 이 경우 subscribe()에 지정된 fetchSize 값은 무시된다.

subscribeOn()을 제거하면 다음과 같은 결과를 얻는다.

15:38:52.191 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:38:52.229 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
15:38:52.243 [main] INFO reactor.Flux.Array.1 - | request(2)
15:38:52.244 [main] INFO reactor.Flux.Array.1 - | onNext(red)
15:38:52.244 [main] INFO reactor.Flux.Array.1 - | onNext(white)
15:38:52.244 [pub-1] DEBUG com.sample.demo.Main - Consumed: RED
15:38:52.244 [pub-1] DEBUG com.sample.demo.Main - Consumed: WHITE
15:38:52.244 [pub-1] INFO reactor.Flux.Array.1 - | request(2)
15:38:52.244 [pub-1] INFO reactor.Flux.Array.1 - | onNext(blue)
15:38:52.245 [pub-1] INFO reactor.Flux.Array.1 - | onComplete()
15:38:52.245 [pub-1] DEBUG com.sample.demo.Main - Consumed: BLUE 

 

추출자: 어둠의 구독자(Extractors: The Subscribers from Dark Side)

시퀀스를 구독하는 또다른 방법이 있는데, Mono.block()이나 Mono.toFuture(), 혹은 Flux.toStream() 메소드를 호출하는 것이다.(이들은 "추출자" 메소드인데, 이들은 리액티브 타입을 덜 유연한 블록킹 추상이다.) Flux에는 또 collectList()collectMap()과 같은 변환기(converter)들이 있는데, 이 메소드들은 Flux를 Mono로 변환해준다. 이들은 사실 시퀀스를 구독하는 것이 아니라, 개별 아이템 레벨에서 구독을 통해 갖고 있었을 모든 제어권을 없애버린다.

가장 좋은 원칙은 "추출자를 호출하지 않는" 것이다. 여기에도 몇 가지 예외가 있는데(그렇지 않았다면 이런 메소드가 존재하지도 않았을 것이다.), 그 중 가장 주목할 만한 것은 테스트를 할 때이다. 테스트를 할 때에는 결과들을 블록킹하여 축적하는 것이 매우 유용하기 때문이다.

이 메소드들은 리액티브 객체로부터 블럭킹으로의 비상통로와 같은 역할을 한다; Spring MVC와 같은 기존 API를 조정해야 할 경우와 같이 말이다. Mono.block() 메소드를 호출하면 Reactive Stream의 모든 혜택을 사용할 수 없게 된다. 이것이 Reactive Stream과 Java 8 Streams의 주요한 차이점이다. Java의 Stream은 "전체 혹은 아무것도 아님"이라는 구독 모델만을 갖고 있는데, 이는 Mono.block()도 동일하다. 물론 subscribe() 메소드도 호출된 스레드를 블록할 수 있고, 그래서 추출자만큼이나 위험하긴 하지만, 그래도 더 많은 제어권을 갖을 수 있다.- subscribeOn() 메소드를 이용하여 블록킹되지 않도록 할 수 있고, 배압을 적용하고 주기적으로 계속 진행할지 말지를 결정하여 아이템들이 하나씩 소비되도록 할 수 있다.


 

Conclusions

이 글에서 Reactive Stream과 Reactor API들의 기본적인 내용에 대해 다뤄보았다. 보다 더 많은 내용을 알고 싶다면 참고할만한 내용들이 도처에 널려있긴 하지만, 직접 코딩하는 것만큼 도움이 되지는 않을 것이다. 그러므로 (이 글을 쓰는데 사용된 테스트 코드를 담고 있는) GitHub의 코드를 사용해 보거나, Lite RX Hands On 워크샵을 주의깊게 살펴보길 추천한다. 지금까지, 확실히 내용에 오버헤드가 많았고, 리액티브 도구를 사용했다면 수행하지 못했을 내용에 대해서도 상세히 다루지 못한 것이 사실이다. 다음 글에서는 리액티브 모델의 블록킹, 디스패칭, 그리고 비동기적인 측면들에 대해 보다 깊이 다룰 것이고, 그래서 리액티브 프로그래밍을 했을 때 얻을 수 있는 실제 혜택에 대해서 설명할 것이다.