Spring

Spring Reactor 시작하기

조슈아。 2023. 5. 2. 09:27
반응형

Spring Reactor 

리액티브 프로그래밍은 일련의 작업 단계를 기술하는 것이 아니라 데이터가 전달될 파이프라인을 구성하여 데이터가 전달되는 동안 어떤 형태로든 변경 또는 사용되는 것.

 

사람의 이름을 가져와 대문자로 변경 후 출력

<명령형 코드>

String name = "devPaik";
String capitalName = name.toUpperCase();
String greeting = "Hello, "+ capitalName + "!";
System.out.println(greeting);

<리엑티브 코드>

Mono.just("devPaik")
    .map(n -> n.toUpperCase())
    .map(cn -> "Hello," + cn + "!")
    .subscribe(System.out::println);

위와 같이 리엑티브 코드는 데이터가 파이프라인으로 구성하는 것을 볼 수 있다.

파이프라인의 각각의 단계에서 어떻게 하던 데이터는 변경되고, 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.

리액터에는 Mono, Flux가 있는데 두 개 모두 리엑티브 스트림의 Publisher 인터페이스로 구현한 것이다.

  • Mono : 하나의 데이터 항목만을 갖는 데이터셋에 최적회된 타입
  • Flux : 0, 1 or 다수(무한)의 데이터를 갖는 파이프라인

프로젝트에 리액터 추가

리액터 의존성 및 테스트 추가

gradle

compile('io.projectreactor:reactor-core:3.2.10.RELEASE')
testCompile('io.projectreactor:reactor-test:3.2.10.RELEASE')

maven

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.10.RELEASE</version>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.2.10.RELEASE</version>
    <scope>test</scope>
</dependency>

Reactive Type [Flux, Mono]

Reactive Streams는 네 개의 인터페이스가 정의되어 있다.

  • Publisher<T>
  • Subscriber<T>
  • Subscription
  • Processor <T,R>

리액터 프로젝트는 Publisher의 구현체로 Flux<T>Mono<T> 두 가지가 있다.

Flux

  • Flux는 Publish의 구현체로 0, 1, 또는 여러 요소(0-N개)를 발행하는 일반적인 리엑티브 스트림을 정의할 수 있다. (RxJava에서는 Flowable/Observable을 말할 수 있다.)

표현식

onNext x 0..N [ onError | onComplete ]

Flux

아래 코드는 1 ~ 5까지 배열로 출력하는 코드이다.

List<Integer> loadStream = Flux.range(1, 5)
//    .repeat()
    .collectList()
    .block();
System.out.println(loadStream);
  • range(1, 5) : 1~5 정수 시퀀스를 생성
  • repeate() : 스트림이 끝나고 다시 스트림을 재구독하는 역할을 한다.
  • collectList() : 생성된 모든 요소를 단일 리스트로 만든다.
  • block() : 실제 구독을 기동하고 최종 결과가 도착할 때까지 실행중인 스레드를 차단한다.

결과

[1, 2, 3, 4, 5]

Mono

Mono는 최대 하나의 요소(0-1개)를 생성할 수 있는 데이터를 스트림을 정의한다.

표현식

onNect x 0..1 [onError | onComplete]

Mono

Mono와 Flux의 차이는 버퍼 중복과 동기화 작업을 생략하기 때문에 Mono를 보다 효율적으로 사용할 수 있고, 응용프로그램 API가 최대 하나의 원소를 반환하는 경우 유용하다. 또한 클라이언트에게 작업이 완료됐음을 알리는데 사용할 수 있다.

Mono와 Flux는 서로 변환이 가능하다. 예를들어 Flux.collectList()는 Mono<List>를 반환하고 Mono.flux는 Flux를 반환한다.

from 을이용하여 Flux을 Mono로 변환할 수 있다.

Flux loadStream = Flux.range(1, 5);
Mono.from(loadStream).subscribe(data -> System.out.println(data));

[결과]

1

RxJava 2.x의 리액티브 타입

  • Observable : RxJava 1.x 와 거의 비슷하지만 null값을 허용하지 않고, 배압을 지원하지 않고, Publisher 인터페이스를 구현하지 않는다. 그래서 리액티브 스트림의 스팩과 호환되지 않는다. 반면 Flowable 타입보다 오버헤드가 적다.
  • Flowable : Flux와 동일한 역할을 하고 Reactive Streams의 Publisher를 구현했다. Flowable API는 Publisher 유형의 인수를 사용할 수 있도록 설계되어 있다.
  • Single : 하나의 요소를 생성하는 스트림을 나타내고, Publisher 인터페이스를 상속하지 않고, 배압전략이 필요 없다.
  • Maybe : Mono 타입과 동일한 의도로 구현되었고, Publisher 인터페이스를 통해 구현하지 않아서 리액티브 스트림과 호환할 수 없다.
  • Completable : RxJava 2.x에는 onError, onComplete 신호만 발생시키고 onNext신호는 생성할 수 없는 Complete 유형이 있다. Publisher 인터페이스를 구현하지 않았다.

Flux와 Mono 시퀀스 생성

Flux

//하나씩 생성
Flux<String> flux1 = Flux.just("1","2","3"); 
//배열형 으로 생성
Flux<Integer> flux2 = Flux.fromArray(new Integer[]{1,2,3});
//Iterable 타입으로 생성
Flux<Integer> flux3 = Flux.fromIterable(Arrays.asList(1,2,3));
//10~15까지 순차적으로 실행하는 데이터 생성
Flux<Integer> flux4 = Flux.range(10,5);

Mono

//하나씩 생성
Mono<String> mono1 = Mono.just("1");
Mono<String> mono2 = Mono.justOrEmpty(null);
Mono<String> mono3 = Mono.justOrEmpty(Optional.empty());

Mono는 HTTP 요청이나 DB Query 같은 비동기 작업을 Wrapping 하는데 유용하고, 아래의 메서드들도 제공한다.

  • fromCallbe
  • fromRunnable
  • fromSupplier
  • fromFuture
  • fromCompleteStionStage

Reactive Streams 구독

subscribe() 메서드를 통해 구독할 수 있다.

  • Consumer<? super T> consumer : 데이터를 하나하나 가져옴(onNext)
  • Consumer<? super Throwable> errorConsumer : 에러를 통지(onError)
  • Runnable completeConsumer : 완료를 통지(onComplete)
  • Consumer<? super Subscription > subscriptionConsumer : 구독자가 원하는 동작을 추가

[Flux.java의 subscribe 코드]

public final Disposable subscribe() {
    return subscribe(null, null, null);
}
//onNext()
public final Disposable subscribe(Consumer<? super T> consumer) {
    Objects.requireNonNull(consumer, "consumer");
    return subscribe(consumer, null, null);
}
//onNext(), onError()
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) {
    Objects.requireNonNull(errorConsumer, "errorConsumer");
    return subscribe(consumer, errorConsumer, null);
}
//onNext(), onError(), onComplete()
public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer) {
    return subscribe(consumer, errorConsumer, completeConsumer, null);
}

//onNext(), onError(), onComplete()
public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer,
        @Nullable Consumer<? super Subscription > subscriptionConsumer) {
    return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
            completeConsumer,
            subscriptionConsumer));
}

리액티브 스트림을 발행하고 구독

[subscribe 코드]

Flux<String> flux1 = Flux.just("1","2","3");

    flux1.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) {
            System.out.println("Exception");
        }
    }, new Runnable() {
        @Override
        public void run() {
            System.out.println("Complete");
        }
    });

[결과]

1
2
3
Complete

사용자 정의 Subscriber 구현

Subscriber를 통해 인터페이스를 직접 구현 할 수 있다.

public class ReactMain {
    public static void main(String[] args) {
        Flux.just("Hello", "world")
                .subscribe(subscribers());
    }

    public static Subscriber<String> subscribers() {
        Subscriber<String> subscriber = new Subscriber<String>() {
            volatile Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("init onsubscribe");
                subscription = s;
                subscription.request(Integer.MAX_VALUE);
            }

            @Override
            public void onNext(String o) {
                System.out.println("onNext");
                System.out.println("Data : "+ o);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("Exception!!");
                System.out.println(t);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        return subscriber;
    }
}

[결과]

init onsubscribe
onNext
Data : Hello
onNext
Data : world
onComplete

구독을 직접 만들어서 구현하게되면 1차원적 코드 흐름이 깨져 오류를 발생하기 쉽다. 그래서 스스로 배압을 관리하고 가입자에 대한 TCK를 잘 구현해야 한다.

그래서 BaseSubscriber 클래스를 상속하여 사용하는 것이 더 좋은 방법으로 TCK에 호환되는 구독자를 훨씬 쉽게 구현할 수 있다. 그리고 hookOnSubscribe, hookOnNext, hookOnError, hookOnCancel, hookOncomplete 등 메서드 들을 재정의 하여 사용할 수 있다.

public class ReactMain {
    public static void main(String[] args) {
        Flux.just("Hello", "world")
                .subscribe(new CustomSubscriber());
    }

    private static class CustomSubscriber<T> extends BaseSubscriber<T> {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("init request");
            request(Integer.MAX_VALUE);
        }

        @Override
        protected void hookOnNext(T value) {
            System.out.println("onNext");
            System.out.println(value);
        }
    }
}

[결과]

init request
onNext
Hello
onNext
world

 

 

참조

https://projectreactor.io/docs/core/release/reference/

반응형

'Spring' 카테고리의 다른 글

[Srping] Webflux  (1) 2023.05.02
[Srping Reactor] 리액티브 오퍼레이션  (0) 2023.05.02
[Spring] Reactive Stream 개요  (0) 2023.05.02
@EventListener를 사용한 발행-구독 패턴  (0) 2023.05.02
AOP(Aspect Oriented Programming)  (0) 2023.03.31