카테고리 없음
Reactor 써보기 (3)
머룽
2023. 4. 23. 14:06
Reactor를 거의 일년만에 다시 작성한다. 요즘 영 귀찮아서 블로그를 잘 안썼더니 올해 처음으로 작성한다.
->\r
if (number == 10) {\r
sink.complete()\r
} else {\r
sink.next(number)\r
}\r
number + 1\r
}).subscribe {\r
println(it)\r
}\r
}\r
[/kotlin]
위의 코드는 1 부터 9까지 생성하는 Flux 이다. 첫번째 파라미터는 최초값이며 두번째 파라미터는 BiFunction을 받고 있으며 현재 상태를 사용하여 신호를 보낸후 다음 상태의 값을 반환한다. 내부 상태값을 갖고 있으니 상태에 맞게 next, complete, error 신호를 발생시키면 된다.
만약 무한으로 신호를 보내고 싶다면 complete, error 신호를 보내지 않으면 된다.
Flux
just, fromIterable, fromStream, range
just는 Mono 에서도 배웠다. 동일하게 Flux에서도 just를 통해 Flux를 생성할 수 있다.@Test
void fluxJustTest() {
Flux.just(1, 2, 3, 4, 5)
.subscribe(System.out::println);
}
[kotlin]\r
@Test\r
fun `flux just test`() {\r
Flux.just(1, 2, 3, 4, 5)\r
.subscribe {\r
println(it)\r
}\r
}\r
[/kotlin]
Mono 와는 조금 다르게 가변인자 파라미터를 받는다. 0 부터 N까지의 스트림을 만들수 있기 때문이다. 실행해보면 1 ~ 5까지 숫자가 출력되는 것을 볼수 있다.
여러개의 엘리먼트를 생성할 수 있으니 Iterable
를 받아 생성할 수 도 있다.
@Test
void fluxFromIterableTest() {
Flux.fromIterable(List.of(1, 2, 3, 4, 5))
.subscribe(System.out::println);
}
[kotlin]\r
@Test\r
fun `flux fromIterable test`() {\r
listOf(1, 2, 3, 4, 5).toFlux()\r
.subscribe {\r
println(it)\r
}\r
}\r
[/kotlin]
또한 java8의 Stream
을 받아 생성할 수 도 있다.
@Test
void fluxStreamTest() {
Flux.fromStream(Stream.of(1, 2, 3, 4, 5))
.subscribe(System.out::println);
}
[kotlin]\r
@Test\r
fun `flux stream test`() {\r
Stream.of(1, 2, 3, 4, 5).toFlux()\r
.subscribe {\r
println(it)\r
}\r
}\r
[/kotlin]
위 처럼 작성해도 1부터5까지의 숫자가 출력된다.
어떤 범위를 표현하고 싶다면 range
를 사용하면 된다.
@Test
void fluxRangeTest() {
Flux.range(1, 10)
.subscribe(System.out::println);
}
}
[kotlin]\r
@Test\r
fun `flux range test`() {\r
Flux.range(1, 10)\r
.subscribe {\r
println(it)\r
}\r
}\r
\r
[/kotlin]
첫번쨰 파라미터는 시작값이고 두번째 파라미터는 갯수를 의미한다.
위 내용을 실행해보면 1부터 10까지 숫자가 출력된다.
여기까지는 가장 기본적인 Flux의 생성법을 알아봤다. 대부분 위의 내용을 알고 있을 듯 싶다. 구글링해보면 대부분 위의 내용이 잘 표현되고 있으니 말이다.
우리는 좀 더 많은 내용을 다뤄보기로 하자.
concat, merge
concat
과 merge
은 거의 동일하게 여러 Publisher를 합쳐 내보낸다.
하지만 조금 다른 부분이 있는데 concat은 순서대로 동작하며 이전 Publisher 먼저 구독하고 완료된 후에 다음 Publisher 가 동작한다.
@Test
void fluxConcatTest() throws InterruptedException {
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)), Flux.just(5, 6, 7, 8))
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(5);
}
[kotlin]\r
@Test\r
fun `flux concat test`() {\r
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)), Flux.just(5, 6, 7, 8))\r
.subscribe {\r
println(it)\r
}\r
\r
TimeUnit.SECONDS.sleep(5)\r
}\r
[/kotlin]
위의 코드는 1 ~ 4까지 1초간격으로 출력되고 5 ~ 6까지는 이전 Publisher(1~4까지)가 완료된후에 한꺼번에 출력된다.
merge의 경우에는 순서가 없으며 다른 Publisher의 영향도 없다.
@Test
void fluxMergeTest() throws InterruptedException{
Flux.merge(Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)), Flux.just(5, 6, 7, 8))
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(5);
}
[kotlin]\r
@Test\r
fun `flux merge test`() {\r
Flux.merge(Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)), Flux.just(5, 6, 7, 8))\r
.subscribe {\r
println(it)\r
}\r
\r
TimeUnit.SECONDS.sleep(5)\r
}\r
[/kotlin]
위는 첫번째 Publisher와 상관없이 두번째 Publisher 동시에 구독한다. 첫번째 Publisher에게 delay를 주어서 5~8까지 먼저 출력 후 1~4가 다음에 출력된다.
만약 순서가 중요하다면 concat을 순서가 상관없다면 merge를 이용하면 된다.
concat은 순서대로 구독을한다.
만약 Publisher의 순서가 상관없고 마지막 시퀀스만 순서대로 방출하려면 어떻게 해야 될까?
Publisher가 빨라도 상관없다면 mergeSequential
를 사용하면 된다. Publisher는 동시에 구독하지만 마지막 시퀀스는 구독 순서대로 방출한다.
동시라고 표현했지만 구독은 순서대로 구독한다. 그 차이가 거의 동시이기에 그렇게 표현했다.
mergeSequential
concat
과 merge
를 조금 합쳐놓은듯한 느낌이다. Publisher는 순서와 상관없이 모두 동시에 구독하며 마지막 시퀀스는 구독 순서대로 방출한다.
[kotlin]\r
@Test\r
fun `flux mergeSequential test`() {\r
Flux.mergeSequential(\r
Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1))\r
.doOnNext { println("first : $it") },\r
Flux.just(5, 6, 7, 8)\r
.doOnNext { println("second : $it") })\r
.subscribe {\r
println(it)\r
}\r
\r
TimeUnit.SECONDS.sleep(5)\r
}\r
[/kotlin]
위의 코드를 실행하면 어떻게 출력될까? 한번 merge
경우에도 어떻게 출력될지 상상해보도록 하자.
Publisher는 순서와 상관없이 동시에 구독하지만 마지막 시퀀스는 순서를 보장한다.
그래서 5~8까지 doOnNext의 로그가 출력되고 1~4까지의 doOnNext와 마지막 시퀀스가 방출된다.
second : 5
second : 6
second : 7
second : 8
first : 1
1
first : 2
2
first : 3
3
first : 4
4
5
6
7
8
위는 mergeSequential
의 결과이다. 마지막 시퀀스는 순서가 보장되었다.
mergeOrdered
merge의 종류도 많다. 사실 이건 언제 사용할지도 감이 잘 오지 않는다. 뭐 하다보면 생길지도 모르겠지만.. 엘리먼트 순서와 관련이 있다. 일단 기본은 오름차순인데 내림차순으로 변경도 할수 있다. Publisher들의 엘리먼트들 순서대로 가장 작은 엘리먼트를 비교하여 방출한다. 말이 어렵구만. 한번 예제를 보자. [kotlin]\r @Test\r fun `flux mergeOrdered test`() {\r Flux.mergeOrdered(Flux.just(9, 6, 11, 3), Flux.just(2, 10, 1, 4))\r .subscribe {\r println(it)\r }\r }\r [/kotlin] 예를들어 위의 두개의 Publisher 실행한다고 해보자. 9-2 를 비교하여 작은 엘리먼트를 방출한다. 2가 방출되었으니 같은 Publisher에 있는 2 다음의 10과 9를 비교하여 작은 엘리먼트인 9를 방출한다. 다음 9가 방출되었으니 6과 10을 비교하여 작은 엘리먼트인 6이 방출된다. 6이 방출되었으니 같은 Publisher에 있는 11과 10을 비교하여 작은 엘리먼트인 10을 방출한다. 이런식으로 모든 엘리먼트를 방출한다.2
9
6
10
1
4
11
3
물론 필자는 숫자로 했지만 Comparable를 구현한 클래스라면 뭐든 가능하다.
combineLatest
이거 역시 말로 설명하기 힘들다. 메서드명과 같이 마지막으로 결합되는 엘리먼트를 방출하는 역할을 하는 메서드이다. 먼저 예제를 보자.@Test
void fluxCombineLatestTest() throws InterruptedException {
Flux.combineLatest(Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80)), Flux.just(4, 5, 6).delayElements(Duration.ofMillis(100)), (a, b) -> a + ", " + b)
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(5);
}
[kotlin]\r
@Test\r
fun `flux combineLatest test`() {\r
Flux.combineLatest(Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80)), Flux.just(4, 5, 6).delayElements(Duration.ofMillis(100))) { a, b ->\r
"$a, $b"\r
}.subscribe {\r
println(it)\r
}\r
TimeUnit.SECONDS.sleep(5)\r
}\r
[/kotlin]
위에서 말했다시피 마지막으로 결합되는 즉시 방출한다.
예를들어 첫번째 Publisher는 80밀리세컨드
로 delay를 시켰고 두번째 Publisher는 100밀리세컨드
로 delay를 주었다.
1이 80ms 뒤에 방출되었지만 4가 20ms동안 더 대기를 해야되므로 100ms 뒤에 1, 4 가 출력된다.
그 후 2는 60ms 뒤에 방출되므로 2, 4 가 출력된다.
다음은 40ms 뒤에 두번째 Publisher에 있는 5가 방출 되어 2, 5가 출력된다.
이런식으로 마지막으로 결합되는 즉시 방출한다. 시원찮지만 대충 그림을 보면 아래와 같다.
80ms 100ms 160ms 200ms 240ms
======== 1 ======== 2 ==== ==== 3
========== 4 ====== ==== 5 ==== ====== 6
========== 1 ====== 2 ==== 2 ==== 3 ====== 3
4 4 5 5 6
generate
generate는 신호를 차례대로 생성하는 메서드이다. 일단 코드를 보자.@Test
void fluxGenerateTest() {
Flux.generate(() -> 1, (number, sink) -> {
if (number == 10) {
sink.complete();
} else {
sink.next(number);
}
return number + 1;
}).subscribe(System.out::println);
}
[kotlin]\r
@Test\r
fun `flux generate test`() {\r
Flux.generate({ 1 }, { number, sink: SynchronousSinkgenerate vs create
Flux 의 create는 설명하지 않았지만 Mono의 create와 사용법은 동일하기에 생략했다. (멀티스레드를 제외한 기능은 동일하다) generate와 create의 차이점을 살펴보자.- generate는 내부에 상태값이 있으며 create는 상태값이 없다.
- generate는 next를 한번만 호출해야되는 반면 create는 여러번 호출해도 된다.
- generate 동기식프로그램만 가능하지만 create는 비동기 멀티스레드 프로그래밍이 가능하다.
create
와 push
를 비교하는게 좀 더 나아 보인다.
좀 더 자세한 내용은 문서를 보는 것을 추천한다. 그래야 이해가 더 빠를 것 같다.
또한 실제로 직접 사용해보고 예제 코드들을 살펴봐야 좀 더 도움이 될 것이다.
오늘 이렇게 Flux 생성에 대해서 알아봤다. 좀 더 많은 사용법이 있지만 이전글 Mono 생성하기에서 겹치는 부분도 있고 필자가 써보지 않았던 메서드들을 작성하지 않았다.
다음시간에는 Flux 오퍼레이터에 대해 알아보도록 하자.