거꾸로 바라본 세상
Published 2023. 4. 26. 09:37
[RxJava] flatMap Language/Java
반응형

flatMap

map 과 마찬가지로 원본 데이터를 변환하여 통지하는 연산자이다. 다만 map 메서드와 달리 여러 데이터가 담긴 Flowable/Observable을 반환하여 데이터 한 개로 여러 데이터를 통지할 수 있다. 또한 Flowable/Observable을 반환해 특정 데이터를 통지하지 않거나 여러 Flowable/Observable을 반환해 에러를 통지할 수 있다.

flatMap-1

flatMap-1 : 원본 타임라인에서 데이터를 받을 때마다 Flowable/Observable을 생성하고 결과 데이터를 통지한다.(생성한 Flowable/Observable이 다른 스레드에서 수행되면 통지 데이터는 비동기로 생성)

flatMap-2

flatMap-2 : 원본 통지 타임라인에서 데이터를 받아 Flowable/Observable을 생성한 후 원본 데이터와 새로 생성한 Flowable/Observable의 데이터를 또 다시 새로운 데이터를 생성하고 결과를 통지한다.

flatMap-3

flatMap-3 : 원본 타임라인에서 데이터를 받고 에러통지와 완료 통지 Flowable/Observable을 각각 생성하여 결과 데이터는 완료 시점에 대체 데이터를 통지하고 그 후에 완료를 통지한다.

flatMap은 변환결과가 null이거나 조건에 맞지 않은 데이터인 경우 또는 에러를 통지해야할 경우 Flowable/Observable을 반환하여 해당 데이터를 건너뛰거나 에러를 통지 할 수 있다.

메서드 arguments

flatMap(mapper)

mpaper는 원본 데이터를 받아서 어떤 작업을 수행하고 결과를 Flowable/Observable로 생성하여 데이터를 각각 통지하는 인터페이스이다. 데이터 반환 시 입력 데이터 한개로 여러 데이터를 통지할 수 있고, 비어있는 Flowable/Observable을 반환하면 데이터를 통지하지 않게 할 수 있고, 에러를 반환하면 에러를 통지하고 종료한다.

[Function 메서드 인터페이스]

new Function<String, Publisher<?>>() {
    @Override
    public Publisher<?> apply(String data) throws Exception {
        return null;
    }
}

[소스코드]

공백데이터를 제외하고 통지하는 코드

public class Main {
    public static void main(String[] args) {
        flatMapMapper();
    }
    static void flatMapMapper() {
        Flowable.just("A","", "B", "", "C", "","D", "")
                .flatMap(new Function<String, Publisher<?>>() {
                    @Override
                    public Publisher<?> apply(String data) throws Exception {
                        return data.equals("") ? Flowable.empty() : Flowable.just(data.toLowerCase());
                    }
                }).subscribe(new PrintSubscriber<>());
    }
}

[통지받은 데이터를 출력하는 Subscribe]

public class PrintSubscriber<T> extends DisposableSubscriber<T> {

    private String label;

    public PrintSubscriber() {
    }

    public PrintSubscriber(String label) {
        this.label = label;
    }
    @Override
    public void onNext(T data) {
        String threadName = Thread.currentThread().getName();
        String resultMsg = label == null ? (threadName + " : " + data) : (threadName + " : "+ label + " : "+ data);
        System.out.println(resultMsg);
    }

    @Override
    public void onError(Throwable throwable) {
        String threadName = Thread.currentThread().getName();
        String resultMsg = label == null ? (threadName + " : Exception=" + throwable) : (threadName + " : "+ label + " : "+ throwable);
        System.out.println(resultMsg);
    }

    @Override
    public void onComplete() {
        String threadName = Thread.currentThread().getName();
        String resultMsg = label == null ? (threadName + " : Complete") : (threadName + " : "+ label + " : Complete");
        System.out.println(resultMsg);
    }
}

[결과]

main : a
main : b
main : c
main : d
main : Complete

flatMap(mapper, combiner)

combiner는 원본 데이터와 mapper에서 생성한 Flowable/Observable의 데이터를 받아 새로운 통지 데이터를 생성하는 인터페이스이다. combiner의 반환 값이 최종적으로 통지하는 데이터이다.

[combiner 인터페이스]

 new BiFunction<String, Object, Object>() {
    @Override
    public Object apply(String s, Object o) throws Exception {
        return o;
    }
}

[소스코드]
공백데이터를 제외하고 통지하는 코드를 원본 데이터와 변환한 데이터를 통지

public class Main {
    public static void main(String[] args) {
        flatMapCombiner();
    }
    static void flatMapCombiner() {
        Flowable.just("A","", "B", "", "C", "","D", "")
                .flatMap(new Function<String, Publisher<?>>() {
                    @Override
                    public Publisher<?> apply(String data) throws Exception {
                        return data.equals("") ? Flowable.empty() : Flowable.just(data.toLowerCase());
                    }
                }, new BiFunction<String, Object, Object>() {
                    @Override
                    public Object apply(String src, Object newSrc) throws Exception {
                        return "[" + src + "]" + newSrc;
                    }
                }).subscribe(new PrintSubscriber<>());
    }
}

[결과]

main : [A]a
main : [B]b
main : [C]c
main : [D]d
main : Complete

flatMap(onNextMapper, onErrorMapper, onCompleteSupplier)

  • onNextMaper : 받은 데이터로 새로운 Flowable/Observable을 생성하는 방법을 정의하는 인터페이스
  • onErrorMapper : 에러를 통지할 때 처리하는 인터페이스
  • onCompleteSupplier : 완료를 통지할 때 처리하는 인터페이스

[소스코드]

public class Main {
    public static void main(String[] args) {
        flatMapNext_ER_CLT();
    }

    static void flatMapNext_ER_CLT() {
        Flowable.just(1, 2, 3, 4, 5, 0, 7)
                .map(data -> 10 / data)
                .flatMap(
                    //일반통지 onNextMaper
                    new Function<Integer, Publisher<?>>() {
                    @Override
                    public Publisher<?> apply(Integer data) throws Exception {
                        return Flowable.just(data);
                    }
                }, 
                    //에러 발생시 처리 onErrorMapper
                    new Function<Throwable, Publisher<?>>() {
                    @Override
                    public Publisher<?> apply(Throwable throwable) throws Exception {
                        return throwable instanceof ArithmeticException ? Flowable.just("-1") : Flowable.error(throwable);
                    }
                }, 
                    // 완료통지 onCompleteSupplier
                    new Callable<Publisher<?>>() {
                    @Override
                    public Publisher<?> call() throws Exception {
                        return Flowable.just(100);
                    }
                }).subscribe(new PrintSubscriber<>());
    }
}

[결과]

main : 10
main : 5
main : 3
main : 2
main : 2
main : -1 //에러발생
main : Complete
반응형

'Language > Java' 카테고리의 다른 글

[RxJava] buffer  (0) 2023.04.26
[RxJava] concatMap / concatMapDelayError  (0) 2023.04.26
[RxJava] map  (0) 2023.04.26
[RxJava] never  (0) 2023.04.26
[RxJava] empty  (0) 2023.04.26
profile

거꾸로 바라본 세상

@란지에。

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!