리액티브 오퍼레이션
Flux와 Mono는 리액터가 제공하는 가장 핵심적인 구성요소이며, 이 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.
Flux와 Mono에는 500개 이상의 오퍼레이션이 있으며 각 오퍼레이션은 다음과 같이 분류된다.
- 생성(Creation) 오퍼레이션
- 조합(Combination) 오퍼레이션
- 변환(Transformation) 오퍼레이션
- 로직(Logic) 오퍼레이션
마블 다이어그램을 보려면 해당 사이트에 가면 볼 수 있다. (https://rxmarbles.com/#from)
1. 생성(Creation) 오퍼레이션
데이터를 생성하여 방출할 때 사용.
객체로부터 생성
Flux나 Mono로 하나 이상의 객체를 생성하려면 just()
메서드를 사용한다..
//flux
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");
//mono
Mono<String> orange = Mono.just("Orange");
위 코드처럼 Flux, Mono에서 just() 이용하여 데이터를 생성했지만 Subscribe가 없는데 이 상태는 호스를 수도꼭지에 끼운 것에 비유할 수 있다. 수도꼭지에 끼운 호스에 물을 흐르게 하려면 Subscribe(구독자)
를 이용하여 데이터를 흘러나가게 한다.
Mono.subscribe(), or Flux.subscribe()
//flux
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");
fruit.subscribe(f -> System.out.println("Fruit : " + f));
//mono
Mono<String> orange = Mono.just("Orange");
orange.subscribe(f -> System.out.println("Fruit : "+ f));
리액터에서 StepVerifier를 사용하면 Mono, Flux를 테스트할 수 있다.
StepVerifier가 fruit를 구독한 후 이름과 일치한지 검사한다.
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");
StepVerifier.create(fruit)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.verifyComplete();
컬렉션으로부터 생성하기
Flux는 Array fromArray()
, Iterable fromIterable()
, Java Stream fromStream()
을 생성할 수 있다.
String[] fruits = new String[]{"Apple", "Orange", "Grape"};
List<String> frustList = new ArrayList<>(Arrays.asList(fruits));
//array fromArray()
Flux<String> fruitArray = Flux.fromArray(fruits);
StepVerifier.create(fruitArray)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.verifyComplete();
//iterable fromIterable()
Flux<String> fruitList = Flux.fromIterable(frustList);
StepVerifier.create(fruitList)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.verifyComplete();
//streams fromStream()
Flux<String> fruitStreams = Flux.fromStream(fruitList.toStream());
StepVerifier.create(fruitStreams)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.verifyComplete();
Flux 데이터 생성
데이터없이 매번 새 값을 증가하는 숫자를 보내는 카운터 역할의 Flux만 필요할 때 range()
를 사용할 수 있다.
1부터 10까지 증가
Flux<Integer> range = Flux.range(1, 10);
StepVerifier.create(range)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.expectNext(6)
.expectNext(7)
.expectNext(8)
.expectNext(9)
.expectNext(10)
.verifyComplete();
시작 값과 종료 값 대신 값이 방출되는 시간 간격이나 주기를 지정해주는 interval()
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)).take(5);
StepVerifier.create(interval)
.expectNext(0l)
.expectNext(1l)
.expectNext(2l)
.expectNext(3l)
.expectNext(4l)
.verifyComplete();
조합(Combination) 오퍼레이션
두 개의 리액티브 타입을 결합해야 하거나 하나의 Flux를 두 개 이상의 리액티브 타입으로 분할해야하는 경우 사용
리액티브 타입 결합
mergeWith()
: 두 개의 Flux 스트림을 하나의 Flux로 결과를 보여줄 때
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").delayElements(Duration.ofNanos(2));
Flux<String> count = Flux.just("WaterMelon","Melon","Kiwi").delaySubscription(Duration.ofNanos(3)).delayElements(Duration.ofNanos(2));
Flux<String> mergeFlux = fruit.mergeWith(count);
StepVerifier.create(mergeFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("WaterMelon")
.expectNext("Melon")
.expectNext("Kiwi")
.verifyComplete();
일반적으로 Flux는 빠르게 데이터를 방출하므로 Flux 스트림 모두에 delayElements()
를 사용하여 조금 느리게 방출할 수 있다. 그리고 delaySubscription()
을 이용하여 일정 시간이 자난 후구독 및 데이터를 방출하도록 할 수 있다.
mergeWith()은 Flux값들을 완벽하게 번갈아 방출되게 보장할 수 없으므로 zip()
오퍼레이션을 사용할 수 있다.
zip()
은 각 Flux 소스로부터 한 항목씩 번갈아 가져와 새로운 Flux를 생성한다.
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape");
Flux<Integer> count = Flux.just(1,2,3);
Flux<Tuple2<String, Integer>> zipFlux = Flux.zip(fruit, count);
zipFlux.subscribe(s-> System.out.println(s.getT1() + " : " + s.getT2()));
//or
Flux<String> zipFlux2 = Flux.zip(fruit, count, (f,c) -> f + " : "+ c);
zipFlux2.subscribe(System.out::println);
3. 변환(Transformation) 오퍼레이션
데이터가 흐르는동안 일부 값을 필터링하거나 변경할 경우 사용
리액티브 타입으로부터 데이터 필터링
skip()
: 맨 앞에서부터 원하는 개수의 항목을 무시하는 것.
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").skip(2);
StepVerifier.create(fruit)
.expectNext("Grape")
.verifyComplete();
2초동안 기다렸다가 값을 방출
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").delayElement(Duration.ofSeconds(2)).skip(2);
StepVerifier.create(fruit)
.expectNext("Grape")
.verifyComplete();
skip()
은 처음부터 여러개의 항목을 건너뛰는 반면, take()
는 지정된 항목만 방출한다.
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").take(1);
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
.expectNext("Apple")
.verifyComplete();
일정 시간이 경과될 동안 방출
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape").delayElements(Duration.ofSeconds(2)).take(1);
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
.expectNext("Apple")
.verifyComplete();
filter()
: Flux를 필터링할 때 사용
Grape가 아닌것만 출력
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape")
.filter(s->!s.equals("Grape"));
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
.expectNext("Apple")
.expectNext("Orange")
.verifyComplete();
distinct()
를 이용하면 중복값을 제거할 수 있다.
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape", "Apple")
.distinct();
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.verifyComplete();
리액티브 데이터 매핑
발행된 항목을 다른 형태나 타입으로 매핑하는 방법으로 대표적으로 map()
과 , flatMap()
이 있다.
map()
: 반환을 수행하는 Flux를 생성하며, 동기적으로 매핑이 수행된다.
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape")
.map(String::toUpperCase);
fruit.subscribe(System.out::println);
StepVerifier.create(fruit)
.expectNext("APPLE")
.expectNext("ORANGE")
.expectNext("GRAPE")
.verifyComplete();
flatMap()
: 반환을 수행하는 Flux를 생성하며, 비동기적으로 매핑을 수행한다.
mpa()
은 한 객체를 다른 객체로 매핑하는 거지만, flatMap()
은 각 객체를 새로운 Mono, Flux로 매핑하여 새로운 Flux를 만들게된다. flatMap(), subscribeOn()을 함께 사용하여 비동기적으로 수행할 수 있다.
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape")
.flatMap(n -> Mono.just(n).map(String::toUpperCase))
.subscribeOn(Schedulers.parallel());
List<String> fruits = Arrays.asList("APPLE", "ORANGE", "GRAPE");
StepVerifier.create(fruit)
.expectNextMatches(fruits::contains)
.expectNextMatches(fruits::contains)
.expectNextMatches(fruits::contains)
.verifyComplete();
subscribe()
은 실제 구독하는 반면 subscribeOn()
은 이름이 더 서술적이며 구독이 동시적으로 처리되어야 한다.
Schedulers
는 다음과 같은 메서드를 가지고 있다.
Schedulers Method | sodyd |
---|---|
.immediate() | 현재 스레드에서 구독을 실행. |
.single() | 재사용 가능한 단일 스레드에서 구독을 실행. 모든 호출자에 대해 동일한 스레드를 재사용 |
.newSingel() | 매 호출마다 전용 스레드에서 구독을 실행. |
.elastic() | 무한하고 신축성 있는 풀에서 가져온 작업 스레드에서 구독을 실행. 필요 시 새로운 작업 스레드가 생성되며, 유후 스레드는 제거(default: 60초) |
.parallel() | 고정된 크기의 풀에서 가져온 작업 스레드에서 구독을 실행하며, CPU코어의 개수가 크기가 된다. |
리엑티브 스트림의 데이터 버퍼링 하기
buffer()
: 데이터를 처리하는 동안 데이터 스트림을 작은 덩어리로 분할
Flux<String> fruit = Flux.just("Apple", "Orange", "Grape","Strawberry", "Banana");
Flux<List<String>> bufferFlux = fruit.buffer(3);
bufferFlux.subscribe(System.out::println);
StepVerifier.create(bufferFlux)
.expectNext(Arrays.asList("Apple", "Orange", "Grape"))
.expectNext(Arrays.asList("Strawberry", "Banana"))
.verifyComplete();
buffer()
를 flatMap()
과 같이 사용하여 병행으로 처리
Flux.just("Apple", "Orange", "Grape","Strawberry", "Banana")
.buffer(3)
.flatMap(s -> Flux.fromIterable(s)
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.log())
.subscribe();
5개의 값을 새로운 Flux로 버퍼링 하여 flatMap()
에 적용한다. 각 List의 버퍼를 가져와서 해당요소로부터 새로운 Flux를 생성하고 map()
을 사용한다. 버퍼링 된 List는 별도의 스레드에서 병행으로 계속 처리될 수 있다.
log()
: 모든 리액티브 시트림 이벤트를 로깅하여 실제 어떻게 처리되는지 파악할 수 있다.
collectList()
: Flux가 발행한 모든 항목을 포함하는 List를 방출하고 새로운 Fluxfmf todtjd
Mono<List<String>> list = Flux.just("Apple", "Orange", "Grape","Strawberry", "Banana").collectList();
StepVerifier.create(list)
.expectNext(Arrays.asList("Apple", "Orange", "Grape","Strawberry", "Banana"))
.verifyComplete();
4. 로직(Logic) 오퍼레이션
Mono나 Flux가 발행한 항목이 어떤 조건과 일치하는지 알아야 할 경우 사용.
all()
: 모든 메시지가 조건을 충족하는지 확인Flux<String> fruitFlux = Flux.just("apple", "orange", "grape","strawberry", "banana"); Mono<Boolean> hasFruit = fruitFlux.all(a-> a.contains("a")); StepVerifier.create(hasFruit) .expectNext(true) .verifyComplete();
any()
: 최소 하나의 메시지가 조건을 충족하는지 확인
Flux<String> fruitFlux = Flux.just("apple", "orange", "grape","strawberry", "banana");
Mono<Boolean> hasFruit = fruitFlux.any(a-> a.contains("orange"));
StepVerifier.create(hasFruit)
.expectNext(true)
.verifyComplete();
참조
'Spring' 카테고리의 다른 글
[Srping] Webflux (1) | 2023.05.02 |
---|---|
Spring Reactor 시작하기 (0) | 2023.05.02 |
[Spring] Reactive Stream 개요 (0) | 2023.05.02 |
@EventListener를 사용한 발행-구독 패턴 (0) | 2023.05.02 |
AOP(Aspect Oriented Programming) (0) | 2023.03.31 |