Spring Reactor
리액티브 프로그래밍은 일련의 작업 단계를 기술하는 것이 아니라 데이터가 전달될 파이프라인을 구성하여 데이터가 전달되는 동안 어떤 형태로든 변경 또는 사용되는 것.
사람의 이름을 가져와 대문자로 변경 후 출력
<명령형 코드>
String name = "devPaik";
String capitalName = name.toUpperCase();
String greeting = "Hello, "+ capitalName + "!";
System.out.println(greeting);
<리엑티브 코드>
Mono.just("devPaik")
.map(n -> n.toUpperCase())
.map(cn -> "Hello," + cn + "!")
.subscribe(System.out::println);
위와 같이 리엑티브 코드는 데이터가 파이프라인으로 구성하는 것을 볼 수 있다.
파이프라인의 각각의 단계에서 어떻게 하던 데이터는 변경되고, 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.
리액터에는 Mono, Flux가 있는데 두 개 모두 리엑티브 스트림의 Publisher 인터페이스로 구현한 것이다.
- Mono : 하나의 데이터 항목만을 갖는 데이터셋에 최적회된 타입
- Flux : 0, 1 or 다수(무한)의 데이터를 갖는 파이프라인
프로젝트에 리액터 추가
리액터 의존성 및 테스트 추가
gradle
compile('io.projectreactor:reactor-core:3.2.10.RELEASE')
testCompile('io.projectreactor:reactor-test:3.2.10.RELEASE')
maven
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.10.RELEASE</version>
<scope>test</scope>
</dependency>
Reactive Type [Flux, Mono]
Reactive Streams는 네 개의 인터페이스가 정의되어 있다.
- Publisher<T>
- Subscriber<T>
- Subscription
- Processor <T,R>
리액터 프로젝트는 Publisher의 구현체로 Flux<T> 및 Mono<T> 두 가지가 있다.
Flux
- Flux는 Publish의 구현체로 0, 1, 또는 여러 요소(0-N개)를 발행하는 일반적인 리엑티브 스트림을 정의할 수 있다. (RxJava에서는 Flowable/Observable을 말할 수 있다.)
표현식
onNext x 0..N [ onError | onComplete ]
아래 코드는 1 ~ 5까지 배열로 출력하는 코드이다.
List<Integer> loadStream = Flux.range(1, 5)
// .repeat()
.collectList()
.block();
System.out.println(loadStream);
- range(1, 5) : 1~5 정수 시퀀스를 생성
- repeate() : 스트림이 끝나고 다시 스트림을 재구독하는 역할을 한다.
- collectList() : 생성된 모든 요소를 단일 리스트로 만든다.
- block() : 실제 구독을 기동하고 최종 결과가 도착할 때까지 실행중인 스레드를 차단한다.
결과
[1, 2, 3, 4, 5]
Mono
Mono는 최대 하나의 요소(0-1개)를 생성할 수 있는 데이터를 스트림을 정의한다.
표현식
onNect x 0..1 [onError | onComplete]
Mono와 Flux의 차이는 버퍼 중복과 동기화 작업을 생략하기 때문에 Mono를 보다 효율적으로 사용할 수 있고, 응용프로그램 API가 최대 하나의 원소를 반환하는 경우 유용하다. 또한 클라이언트에게 작업이 완료됐음을 알리는데 사용할 수 있다.
Mono와 Flux는 서로 변환이 가능하다. 예를들어 Flux.collectList()는 Mono<List>를 반환하고 Mono.flux는 Flux를 반환한다.
from 을이용하여 Flux을 Mono로 변환할 수 있다.
Flux loadStream = Flux.range(1, 5);
Mono.from(loadStream).subscribe(data -> System.out.println(data));
[결과]
1
RxJava 2.x의 리액티브 타입
- Observable : RxJava 1.x 와 거의 비슷하지만 null값을 허용하지 않고, 배압을 지원하지 않고, Publisher 인터페이스를 구현하지 않는다. 그래서 리액티브 스트림의 스팩과 호환되지 않는다. 반면 Flowable 타입보다 오버헤드가 적다.
- Flowable : Flux와 동일한 역할을 하고 Reactive Streams의 Publisher를 구현했다. Flowable API는 Publisher 유형의 인수를 사용할 수 있도록 설계되어 있다.
- Single : 하나의 요소를 생성하는 스트림을 나타내고, Publisher 인터페이스를 상속하지 않고, 배압전략이 필요 없다.
- Maybe : Mono 타입과 동일한 의도로 구현되었고, Publisher 인터페이스를 통해 구현하지 않아서 리액티브 스트림과 호환할 수 없다.
- Completable : RxJava 2.x에는 onError, onComplete 신호만 발생시키고 onNext신호는 생성할 수 없는 Complete 유형이 있다. Publisher 인터페이스를 구현하지 않았다.
Flux와 Mono 시퀀스 생성
Flux
//하나씩 생성
Flux<String> flux1 = Flux.just("1","2","3");
//배열형 으로 생성
Flux<Integer> flux2 = Flux.fromArray(new Integer[]{1,2,3});
//Iterable 타입으로 생성
Flux<Integer> flux3 = Flux.fromIterable(Arrays.asList(1,2,3));
//10~15까지 순차적으로 실행하는 데이터 생성
Flux<Integer> flux4 = Flux.range(10,5);
Mono
//하나씩 생성
Mono<String> mono1 = Mono.just("1");
Mono<String> mono2 = Mono.justOrEmpty(null);
Mono<String> mono3 = Mono.justOrEmpty(Optional.empty());
Mono는 HTTP 요청이나 DB Query 같은 비동기 작업을 Wrapping 하는데 유용하고, 아래의 메서드들도 제공한다.
- fromCallbe
- fromRunnable
- fromSupplier
- fromFuture
- fromCompleteStionStage
Reactive Streams 구독
subscribe() 메서드를 통해 구독할 수 있다.
- Consumer<? super T> consumer : 데이터를 하나하나 가져옴(onNext)
- Consumer<? super Throwable> errorConsumer : 에러를 통지(onError)
- Runnable completeConsumer : 완료를 통지(onComplete)
- Consumer<? super Subscription > subscriptionConsumer : 구독자가 원하는 동작을 추가
[Flux.java의 subscribe 코드]
public final Disposable subscribe() {
return subscribe(null, null, null);
}
//onNext()
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
}
//onNext(), onError()
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) {
Objects.requireNonNull(errorConsumer, "errorConsumer");
return subscribe(consumer, errorConsumer, null);
}
//onNext(), onError(), onComplete()
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, null);
}
//onNext(), onError(), onComplete()
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Consumer<? super Subscription > subscriptionConsumer) {
return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
completeConsumer,
subscriptionConsumer));
}
리액티브 스트림을 발행하고 구독
[subscribe 코드]
Flux<String> flux1 = Flux.just("1","2","3");
flux1.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
System.out.println("Exception");
}
}, new Runnable() {
@Override
public void run() {
System.out.println("Complete");
}
});
[결과]
1
2
3
Complete
사용자 정의 Subscriber 구현
Subscriber를 통해 인터페이스를 직접 구현 할 수 있다.
public class ReactMain {
public static void main(String[] args) {
Flux.just("Hello", "world")
.subscribe(subscribers());
}
public static Subscriber<String> subscribers() {
Subscriber<String> subscriber = new Subscriber<String>() {
volatile Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
System.out.println("init onsubscribe");
subscription = s;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String o) {
System.out.println("onNext");
System.out.println("Data : "+ o);
}
@Override
public void onError(Throwable t) {
System.out.println("Exception!!");
System.out.println(t);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
return subscriber;
}
}
[결과]
init onsubscribe
onNext
Data : Hello
onNext
Data : world
onComplete
구독을 직접 만들어서 구현하게되면 1차원적 코드 흐름이 깨져 오류를 발생하기 쉽다. 그래서 스스로 배압을 관리하고 가입자에 대한 TCK를 잘 구현해야 한다.
그래서 BaseSubscriber 클래스를 상속하여 사용하는 것이 더 좋은 방법으로 TCK에 호환되는 구독자를 훨씬 쉽게 구현할 수 있다. 그리고 hookOnSubscribe, hookOnNext, hookOnError, hookOnCancel, hookOncomplete 등 메서드 들을 재정의 하여 사용할 수 있다.
public class ReactMain {
public static void main(String[] args) {
Flux.just("Hello", "world")
.subscribe(new CustomSubscriber());
}
private static class CustomSubscriber<T> extends BaseSubscriber<T> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("init request");
request(Integer.MAX_VALUE);
}
@Override
protected void hookOnNext(T value) {
System.out.println("onNext");
System.out.println(value);
}
}
}
[결과]
init request
onNext
Hello
onNext
world
참조
'Spring' 카테고리의 다른 글
[Srping] Webflux (1) | 2023.05.02 |
---|---|
[Srping Reactor] 리액티브 오퍼레이션 (0) | 2023.05.02 |
[Spring] Reactive Stream 개요 (0) | 2023.05.02 |
@EventListener를 사용한 발행-구독 패턴 (0) | 2023.05.02 |
AOP(Aspect Oriented Programming) (0) | 2023.03.31 |