거꾸로 바라본 세상
반응형

concatMap / concatMapDelayError

받을 데이터를 Flowable/Observable로 변환하여 Flowable/Observable을 하나씩 순서대로 실행하여 데이터를 통지한다.

concatMap

flatMap 메서드와 다르게 데이터를 받는 순서대로 Flowable/Observable을 생성하여 하나 씩 실행한다. 여러 데이터를 받더라도 첫번 째 데이터로 생성한 Flowable/Observable의 실행이 완료되지 않으면 다음 데이터를 만든 Flowable/Observable의 데이터는 통지되지 않는다. 또한 호출하는 스레드와 별도의 스레드에서 실행되더라도 데이터를 받을 때 바로 실행되지 않으므로 데이터를 받는 순서대로 처리하는 것은 보장하지만, 처리성능에는 영향을 줄 수 있다.

concatMap은 에러가 발생한 시점에 에러를 통지하지만, concatMapDelayError은 에러가 발생하여도 다른 데이터로 생성한 Flowable/Observable의 처리가 완료 될 때까지 에러 통지를 미룬다.

[ConcatMap.java]

import io.reactivex.Flowable;

import java.util.concurrent.TimeUnit;

public class ConcatMap {
    public static void main(String[] args) throws Exception {
        Flowable<String> flowable = Flowable.range(10, 3) // 10부터 3번 카운트 실행
                .concatMap(src -> Flowable.interval(500L, TimeUnit.MILLISECONDS)
                        .take(2) //2건 통지
                        .map(data -> {
                            long time = System.currentTimeMillis();
                            return time + "ms: [" + src + "] " + data;
                        }));
        flowable.subscribe(new PrintSubscriber<>()); //구독
        Thread.sleep(4000L);

        Flowable<String> flowable2 = Flowable.range(10, 3) // 10부터 3번 카운트 실행
                .concatMapDelayError(src -> Flowable.interval(500L, TimeUnit.MILLISECONDS)
                        .take(2) //2건 통지
                        .map(data -> {
                            long time = System.currentTimeMillis();
                            return time + "ms: [" + src + "] " + data;
                        }));
        flowable2.subscribe(new PrintSubscriber<>()); //구독
        Thread.sleep(4000L);
    }

}

[PrintSubscriber.java]


import io.reactivex.subscribers.DisposableSubscriber;

public class PrintSubscriber<T> extends DisposableSubscriber<T> {

    private String label;

    public PrintSubscriber() {
    }

    public PrintSubscriber(String label) {
        this.label = label;
    }

    @Override
    public void onNext(T data) {
        String threadName = Thread.currentThread().getName();
        String resultMsg = label == null ? (threadName + " : " + data) : (threadName + " : " + label + " : " + data);
        System.out.println(resultMsg);
    }

    @Override
    public void onError(Throwable throwable) {
        String threadName = Thread.currentThread().getName();
        String resultMsg = label == null ? (threadName + " : Exception=" + throwable) : (threadName + " : " + label + " : " + throwable);
        System.out.println(resultMsg);
    }

    @Override
    public void onComplete() {
        String threadName = Thread.currentThread().getName();
        String resultMsg = label == null ? (threadName + " : Complete") : (threadName + " : " + label + " : Complete");
        System.out.println(resultMsg);
    }
}
반응형

'Language > Java' 카테고리의 다른 글

[RxJava] buffer  (0) 2023.04.26
[RxJava] flatMap  (0) 2023.04.26
[RxJava] map  (0) 2023.04.26
[RxJava] never  (0) 2023.04.26
[RxJava] empty  (0) 2023.04.26
profile

거꾸로 바라본 세상

@란지에。

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!