거꾸로 바라본 세상
반응형

리액티브 오퍼레이션

Flux와 Mono는 리액터가 제공하는 가장 핵심적인 구성요소이며, 이 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.

Flux와 Mono에는 500개 이상의 오퍼레이션이 있으며 각 오퍼레이션은 다음과 같이 분류된다.

  • 생성(Creation) 오퍼레이션
  • 조합(Combination) 오퍼레이션
  • 변환(Transformation) 오퍼레이션
  • 로직(Logic) 오퍼레이션

마블 다이어그램을 보려면 해당 사이트에 가면 볼 수 있다. (https://rxmarbles.com/#from)

1. 생성(Creation) 오퍼레이션

데이터를 생성하여 방출할 때 사용.

객체로부터 생성

Flux나 Mono로 하나 이상의 객체를 생성하려면 just() 메서드를 사용한다..

//flux
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");

//mono

Mono<String> orange = Mono.just("Orange");

위 코드처럼 Flux, Mono에서 just() 이용하여 데이터를 생성했지만 Subscribe가 없는데 이 상태는 호스를 수도꼭지에 끼운 것에 비유할 수 있다. 수도꼭지에 끼운 호스에 물을 흐르게 하려면 Subscribe(구독자)를 이용하여 데이터를 흘러나가게 한다.

Mono.subscribe(), or Flux.subscribe()

//flux
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");
fruit.subscribe(f -> System.out.println("Fruit : " + f));
//mono
Mono<String> orange = Mono.just("Orange");
orange.subscribe(f -> System.out.println("Fruit : "+  f));

리액터에서 StepVerifier를 사용하면 Mono, Flux를 테스트할 수 있다.

StepVerifier가 fruit를 구독한 후 이름과 일치한지 검사한다.

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");
StepVerifier.create(fruit)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .verifyComplete();

컬렉션으로부터 생성하기

Flux는 Array fromArray(), Iterable fromIterable(), Java Stream fromStream()을 생성할 수 있다.

String[] fruits = new String[]{"Apple", "Orange", "Grape"};
List<String> frustList = new ArrayList<>(Arrays.asList(fruits));

//array fromArray()
Flux<String> fruitArray = Flux.fromArray(fruits);
StepVerifier.create(fruitArray)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .verifyComplete();

//iterable fromIterable()
Flux<String> fruitList = Flux.fromIterable(frustList);
StepVerifier.create(fruitList)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .verifyComplete();

//streams fromStream()
Flux<String> fruitStreams = Flux.fromStream(fruitList.toStream());
StepVerifier.create(fruitStreams)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .verifyComplete();

Flux 데이터 생성

데이터없이 매번 새 값을 증가하는 숫자를 보내는 카운터 역할의 Flux만 필요할 때 range()를 사용할 수 있다.

1부터 10까지 증가

Flux<Integer> range = Flux.range(1, 10);
StepVerifier.create(range)
        .expectNext(1)
        .expectNext(2)
        .expectNext(3)
        .expectNext(4)
        .expectNext(5)
        .expectNext(6)
        .expectNext(7)
        .expectNext(8)
        .expectNext(9)
        .expectNext(10)
        .verifyComplete();

시작 값과 종료 값 대신 값이 방출되는 시간 간격이나 주기를 지정해주는 interval()

Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)).take(5);
StepVerifier.create(interval)
        .expectNext(0l)
        .expectNext(1l)
        .expectNext(2l)
        .expectNext(3l)
        .expectNext(4l)
        .verifyComplete();

조합(Combination) 오퍼레이션

두 개의 리액티브 타입을 결합해야 하거나 하나의 Flux를 두 개 이상의 리액티브 타입으로 분할해야하는 경우 사용

리액티브 타입 결합

mergeWith() : 두 개의 Flux 스트림을 하나의 Flux로 결과를 보여줄 때

 Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").delayElements(Duration.ofNanos(2));
Flux<String> count = Flux.just("WaterMelon","Melon","Kiwi").delaySubscription(Duration.ofNanos(3)).delayElements(Duration.ofNanos(2));

Flux<String> mergeFlux = fruit.mergeWith(count);

StepVerifier.create(mergeFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("WaterMelon")
        .expectNext("Melon")
        .expectNext("Kiwi")
        .verifyComplete();

일반적으로 Flux는 빠르게 데이터를 방출하므로 Flux 스트림 모두에 delayElements()를 사용하여 조금 느리게 방출할 수 있다. 그리고 delaySubscription()을 이용하여 일정 시간이 자난 후구독 및 데이터를 방출하도록 할 수 있다.

mergeWith()은 Flux값들을 완벽하게 번갈아 방출되게 보장할 수 없으므로 zip() 오퍼레이션을 사용할 수 있다.

zip()은 각 Flux 소스로부터 한 항목씩 번갈아 가져와 새로운 Flux를 생성한다.

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");
Flux<Integer> count = Flux.just(1,2,3);

Flux<Tuple2<String, Integer>> zipFlux = Flux.zip(fruit, count);
zipFlux.subscribe(s-> System.out.println(s.getT1() + " : " + s.getT2()));

//or

Flux<String> zipFlux2 = Flux.zip(fruit, count, (f,c) -> f + " : "+ c);
zipFlux2.subscribe(System.out::println);

3. 변환(Transformation) 오퍼레이션

데이터가 흐르는동안 일부 값을 필터링하거나 변경할 경우 사용

리액티브 타입으로부터 데이터 필터링

skip() : 맨 앞에서부터 원하는 개수의 항목을 무시하는 것.

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").skip(2);
StepVerifier.create(fruit)
        .expectNext("Grape")
        .verifyComplete();

2초동안 기다렸다가 값을 방출

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").delayElement(Duration.ofSeconds(2)).skip(2);
StepVerifier.create(fruit)
        .expectNext("Grape")
        .verifyComplete();

skip()은 처음부터 여러개의 항목을 건너뛰는 반면, take()는 지정된 항목만 방출한다.

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").take(1);
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
            .expectNext("Apple")
            .verifyComplete();

일정 시간이 경과될 동안 방출

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").delayElements(Duration.ofSeconds(2)).take(1);
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
        .expectNext("Apple")
        .verifyComplete();

filter() : Flux를 필터링할 때 사용

Grape가 아닌것만 출력

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape")
        .filter(s->!s.equals("Grape"));
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
        .expectNext("Apple")
        .expectNext("Orange")
        .verifyComplete();

distinct()를 이용하면 중복값을 제거할 수 있다.

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape", "Apple")
        .distinct();
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .verifyComplete();

리액티브 데이터 매핑

발행된 항목을 다른 형태나 타입으로 매핑하는 방법으로 대표적으로 map()과 , flatMap()이 있다.

map() : 반환을 수행하는 Flux를 생성하며, 동기적으로 매핑이 수행된다.

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape")
        .map(String::toUpperCase);
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
        .expectNext("APPLE")
        .expectNext("ORANGE")
        .expectNext("GRAPE")
        .verifyComplete();

flatMap() : 반환을 수행하는 Flux를 생성하며, 비동기적으로 매핑을 수행한다.

mpa()은 한 객체를 다른 객체로 매핑하는 거지만, flatMap()은 각 객체를 새로운 Mono, Flux로 매핑하여 새로운 Flux를 만들게된다. flatMap(), subscribeOn()을 함께 사용하여 비동기적으로 수행할 수 있다.

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape")
        .flatMap(n -> Mono.just(n).map(String::toUpperCase))
        .subscribeOn(Schedulers.parallel());

List<String> fruits = Arrays.asList("APPLE", "ORANGE", "GRAPE");

StepVerifier.create(fruit)
        .expectNextMatches(fruits::contains)
        .expectNextMatches(fruits::contains)
        .expectNextMatches(fruits::contains)
        .verifyComplete();

subscribe()은 실제 구독하는 반면 subscribeOn()은 이름이 더 서술적이며 구독이 동시적으로 처리되어야 한다.

Schedulers는 다음과 같은 메서드를 가지고 있다.

Schedulers Method sodyd
.immediate() 현재 스레드에서 구독을 실행.
.single() 재사용 가능한 단일 스레드에서 구독을 실행. 모든 호출자에 대해 동일한 스레드를 재사용
.newSingel() 매 호출마다 전용 스레드에서 구독을 실행.
.elastic() 무한하고 신축성 있는 풀에서 가져온 작업 스레드에서 구독을 실행. 필요 시 새로운 작업 스레드가 생성되며, 유후 스레드는 제거(default: 60초)
.parallel() 고정된 크기의 풀에서 가져온 작업 스레드에서 구독을 실행하며, CPU코어의 개수가 크기가 된다.

리엑티브 스트림의 데이터 버퍼링 하기

buffer() : 데이터를 처리하는 동안 데이터 스트림을 작은 덩어리로 분할

Flux<String> fruit = Flux.just("Apple", "Orange", "Grape","Strawberry", "Banana");

Flux<List<String>> bufferFlux = fruit.buffer(3);

bufferFlux.subscribe(System.out::println);

StepVerifier.create(bufferFlux)
        .expectNext(Arrays.asList("Apple", "Orange", "Grape"))
        .expectNext(Arrays.asList("Strawberry", "Banana"))
        .verifyComplete();

buffer()flatMap()과 같이 사용하여 병행으로 처리

Flux.just("Apple", "Orange", "Grape","Strawberry", "Banana")
            .buffer(3)
            .flatMap(s -> Flux.fromIterable(s)
                    .map(String::toUpperCase)
                    .subscribeOn(Schedulers.parallel())
                    .log())
            .subscribe();

5개의 값을 새로운 Flux로 버퍼링 하여 flatMap()에 적용한다. 각 List의 버퍼를 가져와서 해당요소로부터 새로운 Flux를 생성하고 map() 을 사용한다. 버퍼링 된 List는 별도의 스레드에서 병행으로 계속 처리될 수 있다.

log() : 모든 리액티브 시트림 이벤트를 로깅하여 실제 어떻게 처리되는지 파악할 수 있다.

collectList() : Flux가 발행한 모든 항목을 포함하는 List를 방출하고 새로운 Fluxfmf todtjd

 Mono<List<String>> list = Flux.just("Apple", "Orange", "Grape","Strawberry", "Banana").collectList();
StepVerifier.create(list)
        .expectNext(Arrays.asList("Apple", "Orange", "Grape","Strawberry", "Banana"))
        .verifyComplete();

4. 로직(Logic) 오퍼레이션

Mono나 Flux가 발행한 항목이 어떤 조건과 일치하는지 알아야 할 경우 사용.

  • all() : 모든 메시지가 조건을 충족하는지 확인
    Flux<String> fruitFlux = Flux.just("apple", "orange", "grape","strawberry", "banana");
    Mono<Boolean> hasFruit = fruitFlux.all(a-> a.contains("a"));
    StepVerifier.create(hasFruit)
          .expectNext(true)
          .verifyComplete();
  • any() : 최소 하나의 메시지가 조건을 충족하는지 확인
Flux<String> fruitFlux = Flux.just("apple", "orange", "grape","strawberry", "banana");
Mono<Boolean> hasFruit = fruitFlux.any(a-> a.contains("orange"));
StepVerifier.create(hasFruit)
        .expectNext(true)
        .verifyComplete();

 

참조

https://projectreactor.io/docs/core/release/reference/

반응형

'Spring' 카테고리의 다른 글

[Srping] Webflux  (1) 2023.05.02
Spring Reactor 시작하기  (0) 2023.05.02
[Spring] Reactive Stream 개요  (0) 2023.05.02
@EventListener를 사용한 발행-구독 패턴  (0) 2023.05.02
AOP(Aspect Oriented Programming)  (0) 2023.03.31
profile

거꾸로 바라본 세상

@란지에。

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!