Page tree
Skip to end of metadata
Go to start of metadata

Introduction

이 글에서는 연습문제를 통해 React3의 사용법에 대해 소개한 TechIO의 Reactive Programming with Reactor3의 내용을 발췌/번역하고 있습니다.

Spring Reactor의 커미터 중 한 명인 Sebastien Deleuze가 작성한 것으로, Reactor의 기본적인 사용법을 익히는 데 도움을 줄 것입니다. 소스코드는 https://github.com/reactor/lite-rx-api-hands-on.git 에서 얻을 수 있습니다.

Prerequisite

이 글을 원활히 읽기 위해서는 다음 지식이 필요합니다.

  • 리액티브 프로그래밍 개요
  • JAVA8+ 람다식

 

1. Flux

Flux(RxJava2에서는 Observable)에 대해서는 이미 여러번 다룬 바 있습니다. 이에 대해 다시 한 번 정리하면 다음과 같습니다.

Flux는 Reactive Streams에서 정의한 Publisher<T> 인터페이스의 구현체로, 하나의 Flux로부터 또다른 Flux 혹은 Mono(다음 절에서 설명)를 생성할 수 있도록 수많은 API를 제공한다.

 0개에서 n개의 데이터를 방출(emit) 할 수 있는데, 하나의 데이터를 방출할 때 마다 onNext 이벤트를 발생하며, doOnNext로 등록된 @FunctionalInterface Consumer<T>가 호출된다.

Flux 내의 모든 데이터에 대한 처리가 완료되면 onComplete 이벤트가, 오류가 발생하면 onError 이벤트가 발생되며, 역시 각각 등록된 함수형 인터페이스 구현체가 호출된다. 이들 종료 이벤트가 발생하지 않는 경우, 이 Flux는 무한히 데이터를 방출하게 된다.

  • Flux에는 Flux 소스를 생성하거나 여러가지 콜백 타입들로부터 Flux를 생성하도록 하는 정적 팩토리 메소드가 등록되어 있다.
  • 오퍼레이터(operator), 즉, 인스턴스 메소드들을 이용하면 비동기적인 처리 순서를 생성하는 비동기 처리 파이프라인을 구성할 수 있다.
  • 각각의 Flux.subscribe(...) 메소드나 Flux.publish 혹은 Flux.publishNext와 같은 멀티캐스트 오퍼레이터를 이용하면 처리 파이프라인에 대한 객체를 생성하고 그 안에서 데이터가 흘러가도록 만든다.

Marble diagram representation of a Flux

이제 Flux에 대한 연습문제를 풀어봅시다.

Creating Flux
public class Part01Flux {
	/**
	 * TODO 텅 비어있는 Flux를 리턴하라.
	 */
	Flux<String> emptyFlux() {
        return Flux.empty();
	}


	/**
	 * TODO 문자열 "foo"와 "bar" 두 개의 값을 갖는 Flux를 배열이나 컬렉션을 이용하지 않고 리턴하라.
	 */
	Flux<String> fooBarFluxFromValues() {
        return Flux.just("foo", "bar").doonne;
	}


	/**
	 * TODO 문자열 "foo"와 "bar" 두 개의 값을 리스트를 이용하여 Flux를 리턴하라.
	 */
	Flux<String> fooBarFluxFromList() {
        return Flux.fromIterable(Lists.newArrayList("foo", "bar"));
	}


	/**
	 * TODO IllegalStateException를 throw하는 Flux를 리턴하라.
	 */
	Flux<String> errorFlux() {
        return Flux.error(new IllegalStateException());
	}


	/**
	 * TODO 100ms마다 0부터 9까지의 데이터를 방출하는 Flux를 리턴하라.
	 */
    Flux<Long> counter() {
        return Flux.interval(Duration.ofMillis(100)).take(10L);
	}
}

 

Flux를 생성하는 것 자체는 그리 어렵지 않습니다. 생성된 Flux를 변경하고 조합하는 다양한 방법에 대해 곧 다룰 것입니다.


 

2. Mono

Mono 역시, Flux와 마찬가지로 Publisher<T>의 구현체이다. Flux와 다른 점은 0개 혹은 1개의 데이터만을 방출할 수 있다는 것이다. (그리고 그렇기 때문에 거기에 걸맞는 메소드가 등록되어 있다.)

아무 데이터도 방출하지 않을 때는 Mono<Void>를 사용할 수 있는데, 처리가 종료되었는지 여부만이 중요한 경우 사용될 수 있다.  또, 몇몇 메소드들은 오히려 새로운 Flux를 생성하여 리턴해주기도 하는데, 이는 데이터 흐름 속에서 처리되는 개체의 수가 변경될 수 있음을 의미한다. 즉, Flux에도 마찬가지로 새로운 Mono를 생성하는 메소드가 등록되어 있다.

 

Marble diagram representation of a Mono

그 외의 내용들은 기본적으로 Flux와 동일합니다. 이제 Mono에 대한 연습문제를 풀어보겠습니다.

Creating Mono
public class Part02Mono {
	// TODO 텅 빈 Mono를 리턴하라.
	Mono<String> emptyMono() {
        return Mono.empty();
	}


	// TODO 어떤 데이터도 방출하지 않는 Mono를 리턴하라.
	Mono<String> monoWithNoSignal() {
        return Mono.never();
	}


	// TODO "foo"라는 문자열 값을 포함하는 Mono를 리턴하라.
	Mono<String> fooMono() {
        return Mono.just("foo");
	}


	// TODO IllegalStateException을 방출하는 Mono를 리턴하라.
	Mono<String> errorMono() {
        return Mono.error(new IllegalStateException());
	}
}

Mono를 생성하는 방법도 역시 어렵지 않습니다. 그럼 이제 Reactor 코드를 테스트할 수 있도록 해주는 StepVerifier에 대해 알아보도록 하겠습니다.


 

3. StepVerifier

선언적 프로그래밍(Declarative Programming)의 단점으로 테스트하기 어렵다는 점이 꼽히고 있습니다. 이를 보완하기 위해 Reactor에서는 StepVerifier라는 Flux/Mono 테스트 도구를 제공하고 있습니다.

StepVerifier는 reactor-test 모듈에 포함되어 있는 클래스로, 모든 Publisher<T> 구현체의 처리 단계(step)을 확인할 수 있도록 한다. StepVerifier에는 Publisher의 동작 방식을 예측(expect)할 수 있는 수많은 메소드들이 등록되어 있는데, 이 메소드들은 DSL(Domain Specific Language) 방식으로 사용된다.

모든 예측이 끝나면 반드시 verifyXXX() 메소드를 호출해야 하는데, 그렇지 않은 경우 Publisher 내부에서 데이터가 흘러가지 않는다. 이 처리 방식은 Publisher의 subscribe() 메소드의 그것과 동일하다.

사실 StepVerifier의 사용법은 여기서 다룰 내용보다 훨씬 방대합니다. 자세한 내용은 레퍼런스 문서와 JavaDoc을 참고하시기 바랍니다.

이제 StepVerifier에 대한 연습문제를 풀어보겠습니다.

Using StepVerifier for testing Reactor Code
public class Part03StepVerifier {
	// TODO 입력된 flux가 "foo"와 "bar"를 방출하고 정상적으로 종료(complete)되는지 확인하는 StepVerifier를 작성하라.
	void expectFooBarComplete(Flux<String> flux) {
        StepVerifier.create(flux).expectNext("foo", "bar").verifyComplete();
		//아래 코드도 동일하게 동작함
		//StepVerifier.create(flux).expectNext("foo").expectNext("bar").verifyComplete();
	}


	// TODO 입력된 flux가 "foo"와 "bar"를 방출하고 그 뒤 RuntimeException을 throw하는지 확인하는 StepVerifier를 작성하라.
	void expectFooBarError(Flux<String> flux) {
        StepVerifier.create(flux).expectNext("foo", "bar").expectError(RuntimeException.class);
	}


	// TODO 입력된 flux가 사용자의 username이 swhite와 jpinkman을 순차적으로 방출하고 정상적으로 종료하는지 확인하는 StepVerifier를 작성하라.
	void expectSkylerJesseComplete(Flux<User> flux) {
	    StepVerifier.create(flux)
	        .expectNextMatches(u -> "swhite".equals(u.getUsername()))
	        .expectNextMatches(u -> "jpinkman".equals(u.getUsername()))
	        .verifyComplete();
	}


	// TODO 입력된 flux가 총 10개의 데이터를 방출하고 정상적으로 종료되는지 확인하는 StepVerifier를 작성하고, 실제 코드의 수행 속도를 확인하라.
	void expect10Elements(Flux<Long> flux) {
        StepVerifier.create(flux).expectNextCount(10).verifyComplete();
	}


	// TODO 입력된 flux가 1초에 하나씩 3600초 동안 3600개의 데이터를 방출하는기 확인하는 StepVerifier를 작성하라.
    // 가상 시간을 적용하기 위해 StepVerifier.withVirtualTime을 이용하고, 실제 시간이 얼마나 걸리는지를 확인하라.
	void expect3600Elements(Supplier<Flux<Long>> supplier) {
        StepVerifier.withVirtualTime(supplier).thenAwait(Duration.ofHours(3)).expectNextCount(3600).verifyComplete();
	}
} 

 

4. Transform

생성된 Flux/Mono는 다양한 방식으로 다른 형태, 다른 데이터로 변형될 수 있습니다. 

Transform with Mono.map
 	// Mono.map 메소드와 람다식을 이용하여 입력데이터를 새로운 데이터로 변경할 수 있다.
    // TODO User 객체의 username, firstname, lastname 값을 대문자로 변경하라.
	Mono<User> capitalizeOne(Mono<User> mono) {
        return mono.map(u -> 
            new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase())
        );
	}
Transform with Flux.map
	// TODO User 객체의 username, firstname, lastname 값을 대문자로 변경하라.
	Flux<User> capitalizeMany(Flux<User> flux) {
		return flux.map(
		    u -> new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase())
		);
	} 

 

이제 문자열을 대문자로 변경해주는 웹서비스를 호출해야 한다고 생각해 봅시다. 웹서비스 호출에는 대기시간(latency)이 발생할 수 밖에 없고, 그래서 더 이상은 동기화된(synchronous) map 메소드를 사용할 수 없습니다. 대신 Flux나 Mono와 같은 비동기 호출(asynchronous call)을 이용해야 하는데 이 때 사용하는 오퍼레이터가 flatMap 입니다.

flatMap은 U 객체 대신 Publisher<U>를 리턴하는 @FuntionalInterface Function 객체를 매개변수로 받습니다. 이 때 리턴되는 Publisher는 각 엘리먼트들에 대한 비동기적 변환작업(asynchronous transformation)을 나타냅니다. 만약 그냥 map 메소드를 사용했다면 Flux<Publisher<U>>의 스트림을 리턴받게 되는데, 이는 그다지 유용하지 않습니다. 반면 flatMap은 내부의 Publisher를 어떻게 다뤄야할지 알고 있습니다. 각각의 데이터를 수집한 뒤 이들을 하나의 전역 출력 객체로 병합하므로 Flux<U>보다 훨씬 유용합니다. 만약 내부 Publisher로부터의 값이 서로 다른 시간에 도달하는 경우, 이들은 결과 Flux 내에 끼워넣어지게 됩니다.

 

Asynchronous call with flatMap
	// TODO asyncCapitalizeUser메소드를 이용하여 User 객체의 username, firstname, lastname 값을 대문자로 변경하라.
	Flux<User> asyncCapitalizeMany(Flux<User> flux) {
        return flux.flatMap(u -> asyncCapitalizeUser(u));
	} 

 

5. Merge

병합(Merge)는 여러 개의 Publisher로부터 전달되는 데이터의 흐름을 하나의 Flux로 합치는 작업을 의미합니다. 이 때 mergeWith 메소드를 사용하게 됩니다.

여기서 주의할 점은, mergeWith 메소드를 이용하는 경우, 먼저 도착하는 데이터가 먼저 처리되도록 Flux가 생성된다는 것입니다. 즉 병합되는 Flux들의 순서가 보장되지 않고, 늦게 도착하는 데이터는 끼워넣어지게(interleave) 됩니다.

Merge Fluxes with mergeWith Method
	// TODO flux1과 flux2를 합병하되 먼저 도착하는 것이 먼저 처리되도록 하라.
	Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
        return flux1.mergeWith(flux2);
	} 

만약 병합되는 Flux들의 순서를 지키고 싶다면, mergeWith 대신 concat 메소드를 사용해야 합니다. 

Merge Fluxes with concat
 	// TODO flux1과 flux2를 합병하되 flux1을 먼저 처리하고 flux2가 다음에 처리되도록 하라.
	Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
		return flux1.concatWith(flux2);
	}

또, 여러 개의 Mono를 병합하여 하나의 Flux를 만들 수도 있습니다.

Merge Monos to a Flux
	// TODO mono1을 처리하고 mono2를 그 다음에 처리하는 Flux를 생성하라.
	Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
        return Flux.concat(mono1, mono2);
	} 

 

6. Request

Publisher and Subscriber

리액티브 프로그래밍의 기본 메커니즘 중 하나인 backpressure - 배압에 대해 다뤄볼 차례입니다. 배압은 Subscriber가 Publisher로 '데이터 방출 속도를 제한'하는 신호(signal)을 보내는 피드백 메커니즘을 말합니다.

이 요구에 대한 제어는 Subscription 단계에서 수행됩니다. 매 subscribe() 메소드 호출을 위해 하나의 Subscription 객체가 생성되는데, 데이터의 흐름을 취소하기 위해 cancel() 메소드를 호출할 수도 있고 데이터의 양을 조절하기 위해 request(long) 메소드를 호출할 수도 있습니다. 

request(Long.MAX_VALUE)는 제한 없이 전송할 것을 요구하는 것이며, 이럴 경우 Publisher는 가능한한 가장 빠른 속도로 데이터를 방출하게 됩니다.

StepVerifier를 생성하는 방법은 크게 두 가지로, 위에서 이미 다루었던 create(Publisher<T>) 메소드와 withVirtualTime(Supplier<? extends Publisher<? extends T>>) 두 가지가 있습니다. StepVerifier가 생성된 후 thenRequest(long) 메소드를 이용하여 배압을 설정할 수 있는데, 배압이 설정되면 설정된 값 만큼만 데이터가 방출됩니다.

이제 요청 처리에 대한 연습문제를 풀어보겠습니다.

control publisher request
public class Part06Request {
	ReactiveRepository<User> repository = new ReactiveUserRepository();


	// TODO 입력 flux에 모든 데이터를 방출할 것을 요청한 뒤, 그 결과 네 개의 데이터가 방출되는지 확인하는 StepVerifier를 생성하라.
	StepVerifier requestAllExpectFour(Flux<User> flux) {
        return StepVerifier.create(flux).thenRequest(Long.MAX_VALUE).expectNextCount(4).expectComplete();
	}


	// TODO  입력 flux에 하나의 데이터를 요청하면 User.SKYLER가, 다시 하나의 데이터를 요청하면 User.JESSE가 방출되는지 확인하는 StepVerifier를 생성하라.
	StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
		return StepVerifier.create(flux).thenRequest(1).expectNext(User.SKYLER).thenRequest(1).expectNext(User.JESSE).thenCancel();
	}


	// TODO User repository로부터 모든 데이터를 받은 뒤, 모든 Reactive Streams 신호에 대해 자동으로 로그를 남기를 Flux를 리턴하라.
	Flux<User> fluxWithLog() {
        return repository.findAll().log();
	}


	// TODO subscribe할 때 "Starring:"을, 각각의 데이터에 대해 "firstname, lastname"을, 그리고 완료되었을 때 "The end!"를 출력하는 Flux를 리턴하라.
	Flux<User> fluxWithDoOnPrintln() {
		return repository.findAll()
		        .doOnSubscribe(u -> System.out.println("Starring:"))
		        .doOnNext(u -> System.out.println(u.getFirstname() + " " + u.getLastname()))
		        .doOnComplete(() -> System.out.println("The end!"));
	}
} 

 

7. Error

Reactor는 데이터 처리 과정에서 오류가 발생했을 때 그에 대해 대처할 수 있는 방법을 제공합니다. 어떤 메소드는 실패했을 때 예외를 throw하기도 하고, 기본 값을 이용하도록 하기도 하며, 동일한 데이터를 다시 한 번 요청하도록 하기도 합니다.

에러 처리 내용은 상대적으로 간단하니 바로 연습문제로 에러 처리 기능을 확인해보겠습니다.

Error handling in Reactor
public class Part07Errors {
	// TODO 입려된 Mono를 처리하는 과정에서 오류가 발생한 경우 User.SAUL을 방출하되, 그렇지 않은 경우는 원래의 데이터를 방출하는 mono를 리턴하라.
	Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
        return mono.onErrorReturn(User.SAUL);
	}


	// TODO 입력된 Flux를 처리하는 과정에서 오류가 발생하는 경우 User.SAUL과 User.JESSE를 방출하되, 그렇지 않은 경우는 원래의 데이터를 방출하는 Flux를 리턴하라.
 Return a Flux<User> containing User.SAUL and User.JESSE when an error occurs in the input Flux, else do not change the input Flux.
	Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
        return flux.onErrorResume((e) -> Flux.just(User.SAUL, User.JESSE));
	}


	// TODO capitalizeUser 메소드를 이용하여 입력 flux의 각 user들의 값을 대문자로 바꾸되, GetOutOfHereException 예외가 발생한 경우 해당 예외를 포함하고 있는 예외를 방출하는 Flux를 작성하라.
	Flux<User> capitalizeMany(Flux<User> flux) {
        return flux.map(u -> {
            try {
                return capitalizeUser(u);
            } catch (GetOutOfHereException e) {
                throw Exceptions.propagate(e);
            }
        });
	}

	User capitalizeUser(User user) throws GetOutOfHereException {
		if (user.equals(User.SAUL)) {
			throw new GetOutOfHereException();
		}
		return new User(user.getUsername(), user.getFirstname(), user.getLastname());
	}

	protected final class GetOutOfHereException extends Exception {
        private static final long serialVersionUID = 1L;
	}
} 

 

8. Adapt

Reactor는 Reactive Streams API를 구현하고 있으며, 그래서 다른 Reactive Streams 구현체로 변환될 수 있는 몇 가지 팩토리 메소드들을 제공합니다. 이는 다른 구현체들도 마찬가지입니다.

이에 대한 간단한 연습문제를 풀어보겠습니다. (이곳의 모든 메소드들은 두 개씩 묶여서 테스트가 수행됩니다. 두 개씩 구현하고 실행해야 오류가 발생하지 않습니다.) 

Adapting to other Reactive Streams implementation
public class Part09Adapt {
	ReactiveRepository<User> repository = new ReactiveUserRepository();


	// TODO Flux를 RxJava Flowable로 변환하라.
	Flowable<User> fromFluxToFlowable(Flux<User> flux) {
        return Flowable.fromPublisher(flux);
	}
	// TODO RxJava Flowable을 Flux로 변환하라.
	Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
        return Flux.from(flowable);
	}
//========================================================================================
	// TODO Flux를 RxJava Observable로 변환하라.
	Observable<User> fromFluxToObservable(Flux<User> flux) {
        return Observable.fromPublisher(flux);
	}
	// TODO RxJava Observable을 Flux로 변환하라.
	Flux<User> fromObservableToFlux(Observable<User> observable) {
        return Flux.from(observable.toFlowable(BackpressureStrategy.LATEST));
	}
//========================================================================================
	// TODO Mono를 RxJava Single로 변환하라.
	Single<User> fromMonoToSingle(Mono<User> mono) {
        return Single.fromPublisher(mono);
	}
	// TODO RxJava Single를 Mono로 변환하라.
	Mono<User> fromSingleToMono(Single<User> single) {
        return Mono.from(single.toFlowable());
	}
//========================================================================================
	// TODO Mono를 Java 8+ CompletableFuture로 변환하라.
	CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
        return mono.toFuture();
	}
	// TODO Java 8+ CompletableFuture를 Mono로 변환하라.
	Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
        return Mono.fromFuture(future);
	}
}

 

9. Other Operations

Reactor는 지금까지 다룬 기능들 외에도 다양한 기능들을 제공합니다. 자세한 내용은 레퍼런스 가이드 문서와 FluxMono에 대한 JavaDoc을 확인해보시기 바라며, 연습문제를 풀어보겠습니다.

Other Operations of Publishers
 public class Part08OtherOperations {


	// TODO username, firstname, lastname Flux들을 합쳐 하나의 Flux<User>를 생성하라.
	Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
	    return Flux.zip( 
                (args) -> {
                    return new User((String) args[0], (String) args[1], (String) args[2]);
                },
                usernameFlux, firstnameFlux, lastnameFlux
	    );
	}


	// TODO 두 개의 Mono 중 빨리 리턴하는 쪽을 리턴하라.
	Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
        return Mono.first(mono1, mono2);
	}


	// TODO 두 개의 flux 중 첫 번째 값을 빨리 리턴하는 flux를 리턴하라.
	Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
        return Flux.firstEmitting(flux1, flux2);
	}


	// TODO 입력 Flux에 대해 Flux의 종료 신호를 표상하는 Mono<Void>로 변환하라.
	Mono<Void> fluxCompletion(Flux<User> flux) {
        return flux.then();
	}


	// TODO 입력 user가 null이든 아니든 유효한(valid) Mono를 리턴하라.(힌트. Reactive Streams는 null 값을 허용하지 않는다.)
	Mono<User> nullAwareUserToMono(User user) {
        return Mono.justOrEmpty(user);
	}


	// TODO 입력된 Mono가 비어 있는 경우 User.SKYLER를, 그렇지 않은 경우 원래의 값을 방출하는 Mono를 리턴하라.
	Mono<User> emptyToSkyler(Mono<User> mono) {
        return mono.defaultIfEmpty(User.SKYLER);
	}

 

10. Reactive to blocking

가끔 기존 소스를 이용하기 위해 리액티브 타입들을 블록킹 타입으로 변경해야 하는 경우가 있습니다. Mono의 경우 block() 메소드를, Flux의 경우 toIterable() 등의 메소드를 이용할 수 있습니다. 이 메소드들은 처리 과정에서 onError 이벤트를 발생된 경우 해당 Exception을 던지게 됩니다.

주의할 점은, 가능한 한 전체 코드를 리액티브 방식으로 작성해야 한다는 것입니다. 또 리액티브 코드의 중간에 이 방법을 사용해서는 안 되는데, 이럴 경우 전체 리액티브 파이프라인이 잠재적으로 락이 걸릴 수 있기 때문입니다.

이에 대한 간단한 연습문제를 살펴보면 다음과 같습니다.

Reactive to blocking
public class Part10ReactiveToBlocking {


	// TODO Mono 내의 User 데이터를 리턴하라.
	User monoToValue(Mono<User> mono) {
		return mono.block();
	}


	// TODO Flux 내의 User들을 리턴하라.
	Iterable<User> fluxToValues(Flux<User> flux) {
		return flux.toIterable();
	}
}

 

11. Blocking to Reactive

리액티브 프로그래밍을 할 때 큰 문제점 중 하나는, "리액티브 코드가 아닌 기존 코드를 어떻게 다뤄야 하나?" 입니다. 데이터베이스에 대한 JDBC 연결과 같이 블록킹되는 코드가 있고, 이 코드를 성능에 큰 문제가 없도록 하면서 리액티브 파이프라인에 통합하고 싶은 경우 말입니다.

가장 좋은 방법은 Scheduler를 통해 블록킹 코드가 자신만의 실행 맥락(execution context) 내에서 실행되도록 고립하는 것이고, 나머지 파이프라인은 고효율을 유지하도록 하면서 필요한 만큼만의 thread를 생성하는 것입니다.

JDBC 예에서 fromIterable 팩토리 메소드를 이용하여 Flux를 생성할 수는 있습니다. 하지만 그런 경우 나머지 파이프라인이 블록되지 않도록 하려면 어떻게 해야 할까요?

subscribeOn 메소드를 이용하면 시작부터 매개변수로 전달된 Scheduler에서 시퀀스를 고립시킬 수 있습니다. 예를 들어, Scheduler.elastic() 메소드는 요구되는 크기 만큼의 쓰레드풀을 생성하고 사용되지 않게 되면 자동으로 해당 쓰레드들을 해제합니다. 

첫 번째 예에서, 블록킹 저장소로부터 모든 user들을 천천히 읽기 위해 이 방법을 이용할 것입니다. 이 때 Flux.defer 람다식을 이용하여 저장소에 대한 호출을 감싸야 할 것입니다.

Read from blocking repository
	// TODO 블록킹 저장소로부터 모든 User를 읽기 위해 subscribe될 때까지 조회를 지연(defer)하는 Flux를 생성하고 이것을 elastic scheduler로 실행하라.
	Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
        return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
	} 

데이터베이스에 저장하는 것과 같은 느린 subscriber를 위해서는 작업 시퀀스의 보다 작은 부분만을 고립하기 위해 publishOn 오퍼레이터를 이용합니다.subscribeOn과는 달리 publishOn은 이것 아래의 실행 연쇄에 대해서만 영향을 주며, 이것을 새로운 Scheduler로 전환합니다.

이에 대한 예로서, 저장소에 저장을 하기 위해 doOnNext를 이용할 수 있습니다만, 자신만의 실행 컨텍스트 내에서 저장이 이루어지도록 이 방법을 먼저 이용해야 합니다. 저장 결과가 성공인지 실패인지에만 관심이 있다는 것을 보다 명확하게 표현하기 위해 then() 오퍼레이터를 이용할 수 있으며, 이 오퍼레이터는 Mono<Void>를 리턴하게 될 것입니다.

Write to slow blocking repository
	// TODO 전달된 Flux<User> 내의 user를 elastic scheduler를 이용하여 블록킹 저장소에 저장하고, 처리가 완료되었다는 신호를 알려주는 Mono<Void>를 리턴하라.
	Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
        return flux.publishOn(Schedulers.elastic()).doOnNext(repository::save).then();
	} 

 

Conclusions

지금까지 몇 가지 연습문제를 통해 Reactor의 사용법에 대해 다루어보았습니다. Reactor는 Reactive Streams를 구현하여 쉽고 직관적으로 리액티브 프로그래밍을 할 수 있도록 도와줍니다.

이제 다음으로는 SpringBoot와 Reactor3을 이용하여 게시판 프로그램을 작성할 것입니다. 아직 Java9와 Spring5가 배포되지 않은 상태라 최신 기법을 적용할 수는 없겠지만, 리액티브 프로그래밍을 이해하는 데에 많은 도움이 될 것으로 생각합니다.