카테고리 없음
Reactor 써보기 (1)
머룽
2023. 4. 23. 14:06
오랜만에 글을 쓰니 좀 어색하다. 요 근래 계속 글을 안썼더니 말이다.
요즘 필자는 회사에서 Spring Webflux를 사용하고 있다. 그래서 좀 더 잘 사용해보자라는 의미에서 Reactor 를 공부해보도록 하자.
하지만 여기에선 Reactive Streams 에 대해 개념적으로는 설명하지 않겠다. 이미 다른 블로그에 좋은 글들이 많으니 그걸 보고 개념을 이해하면 좋겠다.
위의 그림은 reactor Mono 에 대한 그림이다. 사실 처음 보는 개발자분들이라면 이게 뭔가 싶기도 하다. (필자도 가끔 뭔가 싶기도 하다.)
위의 그림은 마블다이어그램이라 하는데 해당 오퍼레이션들이 어떤 행위를 하는지 나타낸 그림이다.
쉽게 생각하면 왼쪽에서 오른쪽으로의 흐름을 나타내며 오버레이터를 통해 어떤 결과가 어떤식으로 나오고 있는지 생각하면 될듯 싶다.
Mono 는 Reactive Streams 의 구현체로 0 또는 1의 스트림을 만들 수 있다. 나중에 배울 Flux도 마찬가지로 Reactive Streams의 구현체이며 0 부터 N 까지의 스트림을 만들 수 있으니 참고 하면 되겠다.
아주 쉽게 비교를 하자면 java8에 나온 Optional 과 Stream 으로 비교할 수 있을 것 같다.
Optional 은 비어있거나 값을 가지고 있고 Stream 은 0 ~ N 까지의 연속된 요소들을 의미한다.
이렇게 비교하면 좀 더 접근하기 쉬울 것 같아 비교를 해봤다.
그럼 이제 본격적으로 모노를 만들어 보자!
kotlin, java 모두 예제에 넣어봤다.
= NullPointerException().toMono()\r
val errorSupplier = { NullPointerException() }.toMono()\r
\r
[/kotlin]
구독할 때 에러를 방출하여 종료한다. 이것 역시 운영에서 종종 사용하는 편이다.
()\r
val never = Mono.never()\r
\r
[/kotlin]
해당 메서드는 데이터를 방출하지 않는다. 사실상 아무 것도 하지 않는다. empty 는 완료신호는 오지만 never 경우는 무기한으로 실행되므로 오류, 완료 등 어떠한 신호도 오지 않는다.
{\r
return Mono.just("foo")\r
.delayElement(Duration.ofSeconds(1))\r
.doOnNext {\r
println(it)\r
}\r
}\r
\r
private fun testMethod2(): Mono {\r
return Mono.just("bar")\r
.doOnNext {\r
println(it)\r
}\r
}\r
\r
[/kotlin]
위의 메서드들 중에 하나는 delay 를 주었다. 만약 차례대로 실행이 되어야 한다면 -> }\r
\r
[/kotlin]
필자는 어떤 타입인지 보여주기 위함이지 타입은 제거해도 좋다. 참고로 zip의 하나라도 empty 거나 오류를 방출하면 즉시 종료된다. empty 경우엔 onNext도 방출하지 않는다.
(NullPointerException()), testMethod1(), Mono.error(IllegalArgumentException()))\r
.subscribe({ }, {\r
println(it)\r
countDownLatch.countDown()\r
}, {\r
countDownLatch.countDown()\r
})\r
\r
countDownLatch.await()\r
}\r
[/kotlin]
위와 같은 코드가 있다면 모든 zip의 Mono 를 실행하고 나머지 에러를 결합해서 보여준다.
출력 결과는 다음과 같다.
{\r
client.async(request, object : Listener {\r
override fun response(response: Response) {\r
it.success(response)\r
}\r
\r
override fun failure(e: Exception) {\r
it.error(e)\r
}\r
})\r
}\r
}\r
[/kotlin]
위와 같이 비동기 콜백 코드는 모노 형태로 바꾸어 사용할 수 있다.
만약 리스너 해제 및 자원 해제는 { sink ->\r
it.read(buffer, 0, buffer, object : CompletionHandler {\r
override fun completed(result: Int, attachment: ByteBuffer) {\r
sink.success(attachment)\r
}\r
\r
override fun failed(exc: Throwable, attachment: ByteBuffer) {\r
sink.error(exc)\r
}\r
})\r
}\r
}, { it.close() })\r
\r
}\r
[/kotlin]
조금 복잡해 보이긴해도 그닥 어려운 내용은 아니다. 만약 좀 더 궁금하다면 > { Mono.just(it) },\r
Function { transition.commit() },\r
BiFunction { it, error -> transition.rollback() },\r
Function { transition.rollback() })\r
}\r
[/kotlin]
예제가 영 시원찮다. 나중에 좀 더 나온 샘플 코드가 생각나거나 필자가 사용할 일이 있다면 좀 더 구체적으로 남기겠다.
오늘은 중간에도 말했다시피 모노를 만드는 것으로 끝이났다. 이렇게 보니까 모노로 만들 수 있는 메서드가 생각 보다 많은 것 같다. 오버로딩 된 메서드들은 따로 설명하지 않아도 행위 자체는 동일 하기에 필요하다면 문서를 보는 것을 추천한다.
다음 편은 이어서 Mono 의 오퍼레이터에 대해서 살펴보도록 하자.
- reactive-streams-jvm
- 토비의 봄 TV 5회 스프링 리액티브 프로그래밍 (1) - Reactive Streams
- Spring WebFlux는 어떻게 적은 리소스로 많은 트래픽을 감당할까?
- Project Reactor 1.리액티브 프로그래밍
Mono
just
Mono<String> just = Mono.just("hello reactor");
[kotlin]\r
\r
val just = "hello reactor".toMono()\r
\r
[/kotlin]
코틀린의 경우 원래 reactor 에 기본 확장확함수가 있었는데 Deprecated 되고 extensions
디펜더시를 추가 해야 된다.
아주 기본적인 사용법이다. 모노를 만드는 가장 쉬운 방법이다. 자바의 Optional과 조금 비슷해 보인다.
fromSupplier
만약 지연된 처리가 필요하다면 fromSupplier 을 사용하면 된다.
Mono<String> fromSupplier = Mono.fromSupplier(() -> "hello reactor");
[kotlin]\r
\r
val fromSupplier = { "hello reactor" }.toMono()\r
\r
[/kotlin]
from** 으로 시작하는 메서드는 다양하다. Callable, CompletionStage, Runnable, Future 등 여러 메서드들이 있으니 필요에 따라 사용하면 되겠다.
from** 메서드는 기본 값으로 사용하거나 fallback 으로 운영에서도 종종 사용하는 편이다.
error
에러를 만드는 방법이다.
Mono<String> error = Mono.error(new NullPointerException());
Mono<String> errorSupplier = Mono.error(NullPointerException::new);
[kotlin]\r
\r
val error : Monoempty, never
빈 모노와 무기한으로 실행되는 모노를 만든다. 사실never
는 사용한 적이 단 한번도 없다.
Mono<String> empty = Mono.empty();
Mono<String> never = Mono.never();
[kotlin]\r
\r
val empty = Mono.emptynever
는 필요에 따라 테스트할 경우 사용한다 하는데 필자는 그런 경우가 없어 사용한 적은 없다.
아주 기본적인 모노를 만드는 경우를 살펴봤다. 아직 모노만 만드는데 반도 못온 느낌이다. 이러다 오늘은 모노만 만들다 끝나겠는걸..
zip
모노를 만드는 메서드 중에 필자가 아마 가장많이 쓰지 않나 싶다. 물론 각자가 다 다르겠지만 필자의 경우 각각의 Mono 들을aggregating
하는 경우가 많았다. 아마 가장 많이는 사용하지 않더라도 프로젝트에 reactor 를 사용한다면 꼭 한번을 쓸 일이 있을 듯 하다.
방금도 이야기 했지만 모노들을 aggregating 하는 역할을 한다. 모노들이 각각 동작하므로 여러 모노들을 한꺼번에 동작하게 만들 때 유용하게 쓰인다.
Mono<String> zip = Mono.zip(Mono.just("foo"), Mono.just("bar"));
[kotlin]\r
\r
val zip = Mono.zip(Mono.just("foo"), Mono.just("bar"))\r
\r
[/kotlin]
기본 사용법은 위와 같다. 필자가 말한대로 모노들이 각각 동작하는지는 테스트해보자.
@Test
void zipTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.zip(testMethod1(), testMethod2())
.subscribe(it -> { }, (throwable) -> { },
countDownLatch::countDown);
countDownLatch.await();
}
private Mono<String> testMethod1() {
return Mono.just("foo")
.delayElement(Duration.ofSeconds(1))
.doOnNext(System.out::println);
}
private Mono<String> testMethod2() {
return Mono.just("bar")
.doOnNext(System.out::println);
}
[kotlin]\r
\r
@Test\r
fun `zip test`() {\r
val countDownLatch = CountDownLatch(1)\r
\r
Mono.zip(testMethod1(), testMethod2())\r
.subscribe({}, {}, {\r
countDownLatch.countDown()\r
})\r
\r
countDownLatch.await()\r
}\r
\r
private fun testMethod1(): Monofoo
가 출력된 후에 bar
출력 되어야 한다. 하지만 그렇지 않다. 실행시키지마자 bar
가 출력되고 1초후 foo
가 실행 된다.
위와 같이 zip의 파라미터가 2개 일 경우에는 Tuple2<T1, T2> 로 생산 된다. zip으로 8개 까지 가능하며 그 후로는 Iterable 타입으로 넘겨야한다.
Mono.zip(testMethod1(), testMethod2())
.subscribe((Tuple2<String,String> it) -> { });
[kotlin]\r
\r
Mono.zip(testMethod1(), testMethod2())\r
.subscribe { it: Tuple2when
when 은 zip 과 유사하지만 onNext 를 방출하지 않는다. 단지 각각의 모노를 독립적으로 실행 시킬때만 사용하면 된다. 이 역시 종종 사용할 경우가 있다.
@Test
void whenTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.when(testMethod1(), testMethod2())
.subscribe(it -> { }, (throwable) -> { },
countDownLatch::countDown);
countDownLatch.await();
}
[kotlin]\r
\r
@Test\r
fun `when test`() {\r
val countDownLatch = CountDownLatch(1)\r
\r
Mono.`when`(testMethod1(), testMethod2())\r
.subscribe({}, {}, {\r
countDownLatch.countDown()\r
})\r
countDownLatch.await()\r
}\r
\r
[/kotlin]
이 역시 zip
과 동일하게 오류를 방출하면 즉시 종료 된다.
delay
메스드명 그대로 delay를 줄 수 있는 모노를 만들 수 있다. 해당 Duration 만큼 지연된 후에 onNext를 방출한다. 방출되는 Long 의 값은 0 이다.
@Test
void delayTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.delay(Duration.ofSeconds(1))
.doOnNext(System.out::println)
.subscribe(it -> { }, (throwable) -> { },
countDownLatch::countDown);
countDownLatch.await();
}
[kotlin]\r
\r
@Test\r
fun `delay test`() {\r
val countDownLatch = CountDownLatch(1)\r
\r
Mono.delay(Duration.ofSeconds(1))\r
.doOnNext {\r
println(it)\r
}.subscribe({}, {}, { countDownLatch.countDown() })\r
\r
countDownLatch.await()\r
}\r
\r
[/kotlin]
위의 코드는 1초 후에 onNext 로 방출한다.
defer
defer 메서드는 fromSupplier 와 비슷하게 지연된 Mono 처리를 하고 싶다면 해당 메서드를 이용하면 된다.
Mono<String> defer = Mono.defer(() -> Mono.just("foo"));
[kotlin]\r
\r
val defer = Mono.defer { Mono.just("foo") }\r
\r
[/kotlin]
음 간단한 예 로는 아직 배우진 않았지만 switchIfEmpty
에 아주 적합할 수 있다. 만약 모노가 비었을 때 해당 메서드를 사용하여 다른 모노로 대체할 수 있는 fallback 메서드이다.
Mono.just("foo")
.switchIfEmpty(Mono.just("bar"))
.subscribe();
}
[kotlin]\r
\r
Mono.just("foo")\r
.switchIfEmpty(Mono.just("bar"))\r
.subscribe()\r
\r
[/kotlin]
만약 위와 같은 코드가 있다면 모노가 비어있지 않았음에도 불구하고 Mono.just("bar")를 매번 호출 한다. 사실 위와 같은 코드라면 많은 상관은 없지만 만약 다른 무거운 작업을 한다 가정하면 사실 불필요한 작업을 더 하는 꼴이 된다. 좀 더 우아하게 이 때 defer 메서드를 사용하면 된다.
Mono.just("foo")
.switchIfEmpty(Mono.defer(() -> Mono.just("bar")))
.subscribe();
}
[kotlin]\r
\r
Mono.just("foo")\r
.switchIfEmpty { Mono.just("bar") }\r
.subscribe()\r
[/kotlin]
위와 같이 코드를 작성한다면 Mono.just("bar")는 정말로 모노가 비어있을 때만 실행 된다.
from
reactive streams API 의 Publisher 타입을 Mono 로 바꿀 수 있다. 1개 이상의 스트림일 경우 (예 : Flux) 첫번째 onNext 만 방출 되며 종료 된다.
Mono.from(Flux.just(1,2,3,4,5))
.subscribe(System.out::println);
[kotlin]\r
\r
Mono.from(Flux.just(1, 2, 3, 4, 5))\r
.subscribe { println(it) }\r
[/kotlin]
결과는 1만 방출 되며 종료 된다.
꼭 Reactor 만 되는 것은 아니다 Publisher 타입을 구현한 것이라면 해당 메서드를 사용할 수 있다.
아래는 RxJava 를 사용한 예제이다.
Mono.from(Single.just("bar").toFlowable())
.subscribe(System.out::println);
[kotlin]\r
\r
Mono.from(Single.just("bar").toFlowable())\r
.subscribe {\r
println(it)\r
}\r
[/kotlin]
동일하게 bar 가 방출 되며 종료 된다.
***DelayError
에러를 지연시키며 모든 예외가 결합되서 에러를 발생시킨다. 에러가 나더라도 zip 의 모든 Mono 를 실행 시킨다.
@Test
void delayErrorTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.zipDelayError(testMethod1(), Mono.error(new NullPointerException()), testMethod1(), Mono.error(new IllegalArgumentException()))
.subscribe((it) -> {},
System.out::println,
countDownLatch::countDown);
countDownLatch.await();
}
[kotlin]\r
\r
@Test\r
fun `delay error test`() {\r
val countDownLatch = CountDownLatch(1)\r
\r
Mono.zipDelayError(testMethod1(), Mono.errorfoo
foo
reactor.core.Exceptions$CompositeException: Multiple exceptions
만약 zip을 사용했다면 NullPointerException
에러만 방출 한다.
여기에선 zip 만 설명했지만 when 도 동일하다.
create
Listener 혹은 callback 기반의 모노를 만들 수 있다. 예를 들어 비동기 콜백 코드를 모노로 만들 수 있다 생각하면 되겠다. 말보다는 코드를 보면 훨씬 이해가 빠를듯 싶다.
@Test
void createTest() {
Mono.create(sink -> {
client.async(request, new Listener() {
@Override
public void onFailure(Exception e) {
sink.error(e);
}
@Override
public void onResponse(Response response) {
sink.success(response);
}
});
});
}
[kotlin]\r
\r
@Test\r
fun `create test`() {\r
Mono.createonDispose
메서드를 사용해서 처리 할 수 있으며 취소 시그널을 받고 싶다면 onCancel
메서드를 사용하면 된다. 만약 다른 라이브러리를 쓰는데 비동기 콜백 코드로 작성되어 있다면 쉽게 모노 바꿀 수 있어 좋다. 필자도 종종 운영에서 사용했다.
using
이 메서드는 사실 사용해보지 않았다. 그리고 사용할 일도 없었던거 같았다. 마블 다이어그램도 복잡하다. 이 메서드는 외부 자원을 스트리밍하고 해제하는 역할을 한다. 아마 가장 많이 사용할 곳은 파일을 읽고 쓰고 하는곳이 아닐까 싶다. 혹은 socket을 열고 닫고 하는? 대략 그런 부분에서 많이 사용될 듯 싶다. 사실 필자도 써본적이 없어 그냥 간단한 사용법만 가져왔다.
@Test
void usingTest() {
Mono.using(() -> AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ),
it -> Mono.create(sink -> it.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
sink.success(attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
sink.error(exc);
}
})), it -> {
try {
it.close();
} catch (IOException e) {
}
});
}
[kotlin]\r
\r
@Test\r
fun `using test`() {\r
Mono.using({ AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ) }, {\r
Mono.createusing
를 사용하는 코드를 참고 하면 되겠다.
usingWhen
이 것 역시using
과 동일하게 사용해본적이 없다. using 과 사용법은 거의 동일하나 타입이 Publisher
타입이다. 사용곳은 아마 주로 트랜잭션 처리에 사용가능 할 듯 하다.
@Test
void usingWhenTest() {
Mono<String> data = Mono.just("foo");
Mono.usingWhen(data, it ->
Mono.just(it),
it -> transition.commit(),
(it, error) -> transition.rollback(),
it-> transition.rollback())
}
[kotlin]\r
\r
@Test\r
fun `using when test`() {\r
val data = Mono.just("foo")\r
Mono.usingWhen(data, Function