ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Reactor 써보기 (1)
    카테고리 없음 2023. 4. 23. 14:06
    오랜만에 글을 쓰니 좀 어색하다. 요 근래 계속 글을 안썼더니 말이다. 요즘 필자는 회사에서 Spring Webflux를 사용하고 있다. 그래서 좀 더 잘 사용해보자라는 의미에서 Reactor 를 공부해보도록 하자. 하지만 여기에선 Reactive Streams 에 대해 개념적으로는 설명하지 않겠다. 이미 다른 블로그에 좋은 글들이 많으니 그걸 보고 개념을 이해하면 좋겠다. 위의 글들은 한번 읽어보면 좋을 것 같다. 토비느님의 방송 역시 함께 보면 개념적으로 더욱 이해가 빨리 될 듯 싶다. 꽤 많은 시리즈가 있으니 시간 날 때 짬짬이 보면 도움이 확실이 된다. 사실 Reactive Streams 구현체인 Reactor나 RxJava2나 사용법은 대부분 비슷하다. 같은 개발자가 만든건 아니지만 여러 회사들이 협업을 하면서 만든거니 아무래도 비슷할 수 밖에 없을 것도 같다. 아마도 구현체들 대부분 사용법은 비슷하지 않을까 싶다? 필자는 아무래도 거의 대부분 Spring을 사용하기에 Reactor로 먼저 접하게 되었다. 안드로이드에서는 Reactor 보다는 RxJava를 더 선호하고 많이 쓰는 것 같다. 하지만 상관없다. Spring 에선 RxJava 나 Reactor 나 혹은 다른 Reactive Streams 구현 된 어느 것을 써도 좋다. 이건 조만간 다시 한번 다뤄보기로 하겠다. 그럼 한번 어떤 메서드들이 있는지 주로 사용될만한 메서드 위주로 한번 살펴보도록 하자. 일단 모노 부터 만들어보자!

    Mono

    위의 그림은 reactor Mono 에 대한 그림이다. 사실 처음 보는 개발자분들이라면 이게 뭔가 싶기도 하다. (필자도 가끔 뭔가 싶기도 하다.) 위의 그림은 마블다이어그램이라 하는데 해당 오퍼레이션들이 어떤 행위를 하는지 나타낸 그림이다. 쉽게 생각하면 왼쪽에서 오른쪽으로의 흐름을 나타내며 오버레이터를 통해 어떤 결과가 어떤식으로 나오고 있는지 생각하면 될듯 싶다. Mono 는 Reactive Streams 의 구현체로 0 또는 1의 스트림을 만들 수 있다. 나중에 배울 Flux도 마찬가지로 Reactive Streams의 구현체이며 0 부터 N 까지의 스트림을 만들 수 있으니 참고 하면 되겠다. 아주 쉽게 비교를 하자면 java8에 나온 Optional 과 Stream 으로 비교할 수 있을 것 같다. Optional 은 비어있거나 값을 가지고 있고 Stream 은 0 ~ N 까지의 연속된 요소들을 의미한다. 이렇게 비교하면 좀 더 접근하기 쉬울 것 같아 비교를 해봤다. 그럼 이제 본격적으로 모노를 만들어 보자! kotlin, java 모두 예제에 넣어봤다.

    just


    Mono<String> just = Mono.just("hello reactor");
    [kotlin]\r \r val just = "hello reactor".toMono()\r \r [/kotlin]
    코틀린의 경우 원래 reactor 에 기본 확장확함수가 있었는데 Deprecated 되고 extensions 디펜더시를 추가 해야 된다.
    아주 기본적인 사용법이다. 모노를 만드는 가장 쉬운 방법이다. 자바의 Optional과 조금 비슷해 보인다.

    fromSupplier

    만약 지연된 처리가 필요하다면 fromSupplier 을 사용하면 된다.

    Mono<String> fromSupplier = Mono.fromSupplier(() -> "hello reactor");
    [kotlin]\r \r val fromSupplier = { "hello reactor" }.toMono()\r \r [/kotlin] from** 으로 시작하는 메서드는 다양하다. Callable, CompletionStage, Runnable, Future 등 여러 메서드들이 있으니 필요에 따라 사용하면 되겠다. from** 메서드는 기본 값으로 사용하거나 fallback 으로 운영에서도 종종 사용하는 편이다.

    error

    에러를 만드는 방법이다.

    Mono<String> error = Mono.error(new NullPointerException()); Mono<String> errorSupplier = Mono.error(NullPointerException::new);
    [kotlin]\r \r val error : Mono = NullPointerException().toMono()\r val errorSupplier = { NullPointerException() }.toMono()\r \r [/kotlin] 구독할 때 에러를 방출하여 종료한다. 이것 역시 운영에서 종종 사용하는 편이다.

    empty, never

    빈 모노와 무기한으로 실행되는 모노를 만든다. 사실 never는 사용한 적이 단 한번도 없다.

    Mono<String> empty = Mono.empty(); Mono<String> never = Mono.never();
    [kotlin]\r \r val empty = Mono.empty()\r val never = Mono.never()\r \r [/kotlin] 해당 메서드는 데이터를 방출하지 않는다. 사실상 아무 것도 하지 않는다. empty 는 완료신호는 오지만 never 경우는 무기한으로 실행되므로 오류, 완료 등 어떠한 신호도 오지 않는다. never 는 필요에 따라 테스트할 경우 사용한다 하는데 필자는 그런 경우가 없어 사용한 적은 없다. 아주 기본적인 모노를 만드는 경우를 살펴봤다. 아직 모노만 만드는데 반도 못온 느낌이다. 이러다 오늘은 모노만 만들다 끝나겠는걸..

    zip

    모노를 만드는 메서드 중에 필자가 아마 가장많이 쓰지 않나 싶다. 물론 각자가 다 다르겠지만 필자의 경우 각각의 Mono 들을 aggregating 하는 경우가 많았다. 아마 가장 많이는 사용하지 않더라도 프로젝트에 reactor 를 사용한다면 꼭 한번을 쓸 일이 있을 듯 하다. 방금도 이야기 했지만 모노들을 aggregating 하는 역할을 한다. 모노들이 각각 동작하므로 여러 모노들을 한꺼번에 동작하게 만들 때 유용하게 쓰인다.

    Mono<String> zip = Mono.zip(Mono.just("foo"), Mono.just("bar"));
    [kotlin]\r \r val zip = Mono.zip(Mono.just("foo"), Mono.just("bar"))\r \r [/kotlin] 기본 사용법은 위와 같다. 필자가 말한대로 모노들이 각각 동작하는지는 테스트해보자.

    @Test void zipTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zip(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); } private Mono<String> testMethod1() { return Mono.just("foo") .delayElement(Duration.ofSeconds(1)) .doOnNext(System.out::println); } private Mono<String> testMethod2() { return Mono.just("bar") .doOnNext(System.out::println); }
    [kotlin]\r \r @Test\r fun `zip test`() {\r val countDownLatch = CountDownLatch(1)\r \r Mono.zip(testMethod1(), testMethod2())\r .subscribe({}, {}, {\r countDownLatch.countDown()\r })\r \r countDownLatch.await()\r }\r \r private fun testMethod1(): Mono {\r return Mono.just("foo")\r .delayElement(Duration.ofSeconds(1))\r .doOnNext {\r println(it)\r }\r }\r \r private fun testMethod2(): Mono {\r return Mono.just("bar")\r .doOnNext {\r println(it)\r }\r }\r \r [/kotlin] 위의 메서드들 중에 하나는 delay 를 주었다. 만약 차례대로 실행이 되어야 한다면 foo 가 출력된 후에 bar 출력 되어야 한다. 하지만 그렇지 않다. 실행시키지마자 bar가 출력되고 1초후 foo가 실행 된다. 위와 같이 zip의 파라미터가 2개 일 경우에는 Tuple2<T1, T2> 로 생산 된다. zip으로 8개 까지 가능하며 그 후로는 Iterable 타입으로 넘겨야한다.

    Mono.zip(testMethod1(), testMethod2()) .subscribe((Tuple2<String,String> it) -> { });
    [kotlin]\r \r Mono.zip(testMethod1(), testMethod2())\r .subscribe { it: Tuple2 -> }\r \r [/kotlin] 필자는 어떤 타입인지 보여주기 위함이지 타입은 제거해도 좋다. 참고로 zip의 하나라도 empty 거나 오류를 방출하면 즉시 종료된다. empty 경우엔 onNext도 방출하지 않는다.

    when

    when 은 zip 과 유사하지만 onNext 를 방출하지 않는다. 단지 각각의 모노를 독립적으로 실행 시킬때만 사용하면 된다. 이 역시 종종 사용할 경우가 있다.

    @Test void whenTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.when(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }
    [kotlin]\r \r @Test\r fun `when test`() {\r val countDownLatch = CountDownLatch(1)\r \r Mono.`when`(testMethod1(), testMethod2())\r .subscribe({}, {}, {\r countDownLatch.countDown()\r })\r countDownLatch.await()\r }\r \r [/kotlin] 이 역시 zip과 동일하게 오류를 방출하면 즉시 종료 된다.

    delay

    메스드명 그대로 delay를 줄 수 있는 모노를 만들 수 있다. 해당 Duration 만큼 지연된 후에 onNext를 방출한다. 방출되는 Long 의 값은 0 이다.

    @Test void delayTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.delay(Duration.ofSeconds(1)) .doOnNext(System.out::println) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }
    [kotlin]\r \r @Test\r fun `delay test`() {\r val countDownLatch = CountDownLatch(1)\r \r Mono.delay(Duration.ofSeconds(1))\r .doOnNext {\r println(it)\r }.subscribe({}, {}, { countDownLatch.countDown() })\r \r countDownLatch.await()\r }\r \r [/kotlin] 위의 코드는 1초 후에 onNext 로 방출한다.

    defer

    defer 메서드는 fromSupplier 와 비슷하게 지연된 Mono 처리를 하고 싶다면 해당 메서드를 이용하면 된다.

    Mono<String> defer = Mono.defer(() -> Mono.just("foo"));
    [kotlin]\r \r val defer = Mono.defer { Mono.just("foo") }\r \r [/kotlin] 음 간단한 예 로는 아직 배우진 않았지만 switchIfEmpty 에 아주 적합할 수 있다. 만약 모노가 비었을 때 해당 메서드를 사용하여 다른 모노로 대체할 수 있는 fallback 메서드이다.

    Mono.just("foo") .switchIfEmpty(Mono.just("bar")) .subscribe(); }
    [kotlin]\r \r Mono.just("foo")\r .switchIfEmpty(Mono.just("bar"))\r .subscribe()\r \r [/kotlin] 만약 위와 같은 코드가 있다면 모노가 비어있지 않았음에도 불구하고 Mono.just("bar")를 매번 호출 한다. 사실 위와 같은 코드라면 많은 상관은 없지만 만약 다른 무거운 작업을 한다 가정하면 사실 불필요한 작업을 더 하는 꼴이 된다. 좀 더 우아하게 이 때 defer 메서드를 사용하면 된다.

    Mono.just("foo") .switchIfEmpty(Mono.defer(() -> Mono.just("bar"))) .subscribe(); }
    [kotlin]\r \r Mono.just("foo")\r .switchIfEmpty { Mono.just("bar") }\r .subscribe()\r [/kotlin] 위와 같이 코드를 작성한다면 Mono.just("bar")는 정말로 모노가 비어있을 때만 실행 된다.

    from

    reactive streams API 의 Publisher 타입을 Mono 로 바꿀 수 있다. 1개 이상의 스트림일 경우 (예 : Flux) 첫번째 onNext 만 방출 되며 종료 된다.

    Mono.from(Flux.just(1,2,3,4,5)) .subscribe(System.out::println);
    [kotlin]\r \r Mono.from(Flux.just(1, 2, 3, 4, 5))\r .subscribe { println(it) }\r [/kotlin] 결과는 1만 방출 되며 종료 된다. 꼭 Reactor 만 되는 것은 아니다 Publisher 타입을 구현한 것이라면 해당 메서드를 사용할 수 있다. 아래는 RxJava 를 사용한 예제이다.

    Mono.from(Single.just("bar").toFlowable()) .subscribe(System.out::println);
    [kotlin]\r \r Mono.from(Single.just("bar").toFlowable())\r .subscribe {\r println(it)\r }\r [/kotlin] 동일하게 bar 가 방출 되며 종료 된다.

    ***DelayError

    에러를 지연시키며 모든 예외가 결합되서 에러를 발생시킨다. 에러가 나더라도 zip 의 모든 Mono 를 실행 시킨다.

    @Test void delayErrorTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zipDelayError(testMethod1(), Mono.error(new NullPointerException()), testMethod1(), Mono.error(new IllegalArgumentException())) .subscribe((it) -> {}, System.out::println, countDownLatch::countDown); countDownLatch.await(); }
    [kotlin]\r \r @Test\r fun `delay error test`() {\r val countDownLatch = CountDownLatch(1)\r \r Mono.zipDelayError(testMethod1(), Mono.error(NullPointerException()), testMethod1(), Mono.error(IllegalArgumentException()))\r .subscribe({ }, {\r println(it)\r countDownLatch.countDown()\r }, {\r countDownLatch.countDown()\r })\r \r countDownLatch.await()\r }\r [/kotlin] 위와 같은 코드가 있다면 모든 zip의 Mono 를 실행하고 나머지 에러를 결합해서 보여준다. 출력 결과는 다음과 같다.
    foo
    foo
    reactor.core.Exceptions$CompositeException: Multiple exceptions
    
    만약 zip을 사용했다면 NullPointerException 에러만 방출 한다. 여기에선 zip 만 설명했지만 when 도 동일하다.

    create

    Listener 혹은 callback 기반의 모노를 만들 수 있다. 예를 들어 비동기 콜백 코드를 모노로 만들 수 있다 생각하면 되겠다. 말보다는 코드를 보면 훨씬 이해가 빠를듯 싶다.

    @Test void createTest() { Mono.create(sink -> { client.async(request, new Listener() { @Override public void onFailure(Exception e) { sink.error(e); } @Override public void onResponse(Response response) { sink.success(response); } }); }); }
    [kotlin]\r \r @Test\r fun `create test`() {\r Mono.create {\r client.async(request, object : Listener {\r override fun response(response: Response) {\r it.success(response)\r }\r \r override fun failure(e: Exception) {\r it.error(e)\r }\r })\r }\r }\r [/kotlin] 위와 같이 비동기 콜백 코드는 모노 형태로 바꾸어 사용할 수 있다. 만약 리스너 해제 및 자원 해제는 onDispose 메서드를 사용해서 처리 할 수 있으며 취소 시그널을 받고 싶다면 onCancel 메서드를 사용하면 된다. 만약 다른 라이브러리를 쓰는데 비동기 콜백 코드로 작성되어 있다면 쉽게 모노 바꿀 수 있어 좋다. 필자도 종종 운영에서 사용했다.

    using

    이 메서드는 사실 사용해보지 않았다. 그리고 사용할 일도 없었던거 같았다. 마블 다이어그램도 복잡하다. 이 메서드는 외부 자원을 스트리밍하고 해제하는 역할을 한다. 아마 가장 많이 사용할 곳은 파일을 읽고 쓰고 하는곳이 아닐까 싶다. 혹은 socket을 열고 닫고 하는? 대략 그런 부분에서 많이 사용될 듯 싶다. 사실 필자도 써본적이 없어 그냥 간단한 사용법만 가져왔다.

    @Test void usingTest() { Mono.using(() -> AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ), it -> Mono.create(sink -> it.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { sink.success(attachment); } @Override public void failed(Throwable exc, ByteBuffer attachment) { sink.error(exc); } })), it -> { try { it.close(); } catch (IOException e) { } }); }
    [kotlin]\r \r @Test\r fun `using test`() {\r Mono.using({ AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ) }, {\r Mono.create { sink ->\r it.read(buffer, 0, buffer, object : CompletionHandler {\r override fun completed(result: Int, attachment: ByteBuffer) {\r sink.success(attachment)\r }\r \r override fun failed(exc: Throwable, attachment: ByteBuffer) {\r sink.error(exc)\r }\r })\r }\r }, { it.close() })\r \r }\r [/kotlin] 조금 복잡해 보이긴해도 그닥 어려운 내용은 아니다. 만약 좀 더 궁금하다면 using를 사용하는 코드를 참고 하면 되겠다.

    usingWhen

    이 것 역시 using과 동일하게 사용해본적이 없다. using 과 사용법은 거의 동일하나 타입이 Publisher 타입이다. 사용곳은 아마 주로 트랜잭션 처리에 사용가능 할 듯 하다.
    @Test
    void usingWhenTest() {
        Mono<String> data = Mono.just("foo");
        Mono.usingWhen(data, it ->
                Mono.just(it),
                it -> transition.commit(),
                (it, error) ->  transition.rollback(),
                it-> transition.rollback())
    }
    
    [kotlin]\r \r @Test\r fun `using when test`() {\r val data = Mono.just("foo")\r Mono.usingWhen(data, Function> { Mono.just(it) },\r Function { transition.commit() },\r BiFunction { it, error -> transition.rollback() },\r Function { transition.rollback() })\r }\r [/kotlin] 예제가 영 시원찮다. 나중에 좀 더 나온 샘플 코드가 생각나거나 필자가 사용할 일이 있다면 좀 더 구체적으로 남기겠다. 오늘은 중간에도 말했다시피 모노를 만드는 것으로 끝이났다. 이렇게 보니까 모노로 만들 수 있는 메서드가 생각 보다 많은 것 같다. 오버로딩 된 메서드들은 따로 설명하지 않아도 행위 자체는 동일 하기에 필요하다면 문서를 보는 것을 추천한다. 다음 편은 이어서 Mono 의 오퍼레이터에 대해서 살펴보도록 하자.

    댓글

Designed by Tistory.