거꾸로 바라본 세상
Published 2023. 5. 2. 09:27
Spring Reactor 시작하기 Spring
반응형

Spring Reactor 

리액티브 프로그래밍은 일련의 작업 단계를 기술하는 것이 아니라 데이터가 전달될 파이프라인을 구성하여 데이터가 전달되는 동안 어떤 형태로든 변경 또는 사용되는 것.

 

사람의 이름을 가져와 대문자로 변경 후 출력

<명령형 코드>

String name = "devPaik";
String capitalName = name.toUpperCase();
String greeting = "Hello, "+ capitalName + "!";
System.out.println(greeting);

<리엑티브 코드>

Mono.just("devPaik")
    .map(n -> n.toUpperCase())
    .map(cn -> "Hello," + cn + "!")
    .subscribe(System.out::println);

위와 같이 리엑티브 코드는 데이터가 파이프라인으로 구성하는 것을 볼 수 있다.

파이프라인의 각각의 단계에서 어떻게 하던 데이터는 변경되고, 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.

리액터에는 Mono, Flux가 있는데 두 개 모두 리엑티브 스트림의 Publisher 인터페이스로 구현한 것이다.

  • Mono : 하나의 데이터 항목만을 갖는 데이터셋에 최적회된 타입
  • Flux : 0, 1 or 다수(무한)의 데이터를 갖는 파이프라인

프로젝트에 리액터 추가

리액터 의존성 및 테스트 추가

gradle

compile('io.projectreactor:reactor-core:3.2.10.RELEASE')
testCompile('io.projectreactor:reactor-test:3.2.10.RELEASE')

maven

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.10.RELEASE</version>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.2.10.RELEASE</version>
    <scope>test</scope>
</dependency>

Reactive Type [Flux, Mono]

Reactive Streams는 네 개의 인터페이스가 정의되어 있다.

  • Publisher<T>
  • Subscriber<T>
  • Subscription
  • Processor <T,R>

리액터 프로젝트는 Publisher의 구현체로 Flux<T>Mono<T> 두 가지가 있다.

Flux

  • Flux는 Publish의 구현체로 0, 1, 또는 여러 요소(0-N개)를 발행하는 일반적인 리엑티브 스트림을 정의할 수 있다. (RxJava에서는 Flowable/Observable을 말할 수 있다.)

표현식

onNext x 0..N [ onError | onComplete ]

Flux

아래 코드는 1 ~ 5까지 배열로 출력하는 코드이다.

List<Integer> loadStream = Flux.range(1, 5)
//    .repeat()
    .collectList()
    .block();
System.out.println(loadStream);
  • range(1, 5) : 1~5 정수 시퀀스를 생성
  • repeate() : 스트림이 끝나고 다시 스트림을 재구독하는 역할을 한다.
  • collectList() : 생성된 모든 요소를 단일 리스트로 만든다.
  • block() : 실제 구독을 기동하고 최종 결과가 도착할 때까지 실행중인 스레드를 차단한다.

결과

[1, 2, 3, 4, 5]

Mono

Mono는 최대 하나의 요소(0-1개)를 생성할 수 있는 데이터를 스트림을 정의한다.

표현식

onNect x 0..1 [onError | onComplete]

Mono

Mono와 Flux의 차이는 버퍼 중복과 동기화 작업을 생략하기 때문에 Mono를 보다 효율적으로 사용할 수 있고, 응용프로그램 API가 최대 하나의 원소를 반환하는 경우 유용하다. 또한 클라이언트에게 작업이 완료됐음을 알리는데 사용할 수 있다.

Mono와 Flux는 서로 변환이 가능하다. 예를들어 Flux.collectList()는 Mono<List>를 반환하고 Mono.flux는 Flux를 반환한다.

from 을이용하여 Flux을 Mono로 변환할 수 있다.

Flux loadStream = Flux.range(1, 5);
Mono.from(loadStream).subscribe(data -> System.out.println(data));

[결과]

1

RxJava 2.x의 리액티브 타입

  • Observable : RxJava 1.x 와 거의 비슷하지만 null값을 허용하지 않고, 배압을 지원하지 않고, Publisher 인터페이스를 구현하지 않는다. 그래서 리액티브 스트림의 스팩과 호환되지 않는다. 반면 Flowable 타입보다 오버헤드가 적다.
  • Flowable : Flux와 동일한 역할을 하고 Reactive Streams의 Publisher를 구현했다. Flowable API는 Publisher 유형의 인수를 사용할 수 있도록 설계되어 있다.
  • Single : 하나의 요소를 생성하는 스트림을 나타내고, Publisher 인터페이스를 상속하지 않고, 배압전략이 필요 없다.
  • Maybe : Mono 타입과 동일한 의도로 구현되었고, Publisher 인터페이스를 통해 구현하지 않아서 리액티브 스트림과 호환할 수 없다.
  • Completable : RxJava 2.x에는 onError, onComplete 신호만 발생시키고 onNext신호는 생성할 수 없는 Complete 유형이 있다. Publisher 인터페이스를 구현하지 않았다.

Flux와 Mono 시퀀스 생성

Flux

//하나씩 생성
Flux<String> flux1 = Flux.just("1","2","3"); 
//배열형 으로 생성
Flux<Integer> flux2 = Flux.fromArray(new Integer[]{1,2,3});
//Iterable 타입으로 생성
Flux<Integer> flux3 = Flux.fromIterable(Arrays.asList(1,2,3));
//10~15까지 순차적으로 실행하는 데이터 생성
Flux<Integer> flux4 = Flux.range(10,5);

Mono

//하나씩 생성
Mono<String> mono1 = Mono.just("1");
Mono<String> mono2 = Mono.justOrEmpty(null);
Mono<String> mono3 = Mono.justOrEmpty(Optional.empty());

Mono는 HTTP 요청이나 DB Query 같은 비동기 작업을 Wrapping 하는데 유용하고, 아래의 메서드들도 제공한다.

  • fromCallbe
  • fromRunnable
  • fromSupplier
  • fromFuture
  • fromCompleteStionStage

Reactive Streams 구독

subscribe() 메서드를 통해 구독할 수 있다.

  • Consumer<? super T> consumer : 데이터를 하나하나 가져옴(onNext)
  • Consumer<? super Throwable> errorConsumer : 에러를 통지(onError)
  • Runnable completeConsumer : 완료를 통지(onComplete)
  • Consumer<? super Subscription > subscriptionConsumer : 구독자가 원하는 동작을 추가

[Flux.java의 subscribe 코드]

public final Disposable subscribe() {
    return subscribe(null, null, null);
}
//onNext()
public final Disposable subscribe(Consumer<? super T> consumer) {
    Objects.requireNonNull(consumer, "consumer");
    return subscribe(consumer, null, null);
}
//onNext(), onError()
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) {
    Objects.requireNonNull(errorConsumer, "errorConsumer");
    return subscribe(consumer, errorConsumer, null);
}
//onNext(), onError(), onComplete()
public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer) {
    return subscribe(consumer, errorConsumer, completeConsumer, null);
}

//onNext(), onError(), onComplete()
public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer,
        @Nullable Consumer<? super Subscription > subscriptionConsumer) {
    return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
            completeConsumer,
            subscriptionConsumer));
}

리액티브 스트림을 발행하고 구독

[subscribe 코드]

Flux<String> flux1 = Flux.just("1","2","3");

    flux1.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) {
            System.out.println("Exception");
        }
    }, new Runnable() {
        @Override
        public void run() {
            System.out.println("Complete");
        }
    });

[결과]

1
2
3
Complete

사용자 정의 Subscriber 구현

Subscriber를 통해 인터페이스를 직접 구현 할 수 있다.

public class ReactMain {
    public static void main(String[] args) {
        Flux.just("Hello", "world")
                .subscribe(subscribers());
    }

    public static Subscriber<String> subscribers() {
        Subscriber<String> subscriber = new Subscriber<String>() {
            volatile Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("init onsubscribe");
                subscription = s;
                subscription.request(Integer.MAX_VALUE);
            }

            @Override
            public void onNext(String o) {
                System.out.println("onNext");
                System.out.println("Data : "+ o);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("Exception!!");
                System.out.println(t);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        return subscriber;
    }
}

[결과]

init onsubscribe
onNext
Data : Hello
onNext
Data : world
onComplete

구독을 직접 만들어서 구현하게되면 1차원적 코드 흐름이 깨져 오류를 발생하기 쉽다. 그래서 스스로 배압을 관리하고 가입자에 대한 TCK를 잘 구현해야 한다.

그래서 BaseSubscriber 클래스를 상속하여 사용하는 것이 더 좋은 방법으로 TCK에 호환되는 구독자를 훨씬 쉽게 구현할 수 있다. 그리고 hookOnSubscribe, hookOnNext, hookOnError, hookOnCancel, hookOncomplete 등 메서드 들을 재정의 하여 사용할 수 있다.

public class ReactMain {
    public static void main(String[] args) {
        Flux.just("Hello", "world")
                .subscribe(new CustomSubscriber());
    }

    private static class CustomSubscriber<T> extends BaseSubscriber<T> {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("init request");
            request(Integer.MAX_VALUE);
        }

        @Override
        protected void hookOnNext(T value) {
            System.out.println("onNext");
            System.out.println(value);
        }
    }
}

[결과]

init request
onNext
Hello
onNext
world

 

 

참조

https://projectreactor.io/docs/core/release/reference/

반응형

'Spring' 카테고리의 다른 글

[Srping] Webflux  (1) 2023.05.02
[Srping Reactor] 리액티브 오퍼레이션  (0) 2023.05.02
[Spring] Reactive Stream 개요  (0) 2023.05.02
@EventListener를 사용한 발행-구독 패턴  (0) 2023.05.02
AOP(Aspect Oriented Programming)  (0) 2023.03.31
profile

거꾸로 바라본 세상

@란지에。

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!