flatMap
map 과 마찬가지로 원본 데이터를 변환하여 통지하는 연산자이다. 다만 map 메서드와 달리 여러 데이터가 담긴 Flowable/Observable을 반환하여 데이터 한 개로 여러 데이터를 통지할 수 있다. 또한 Flowable/Observable을 반환해 특정 데이터를 통지하지 않거나 여러 Flowable/Observable을 반환해 에러를 통지할 수 있다.
flatMap-1 : 원본 타임라인에서 데이터를 받을 때마다 Flowable/Observable을 생성하고 결과 데이터를 통지한다.(생성한 Flowable/Observable이 다른 스레드에서 수행되면 통지 데이터는 비동기로 생성)
flatMap-2 : 원본 통지 타임라인에서 데이터를 받아 Flowable/Observable을 생성한 후 원본 데이터와 새로 생성한 Flowable/Observable의 데이터를 또 다시 새로운 데이터를 생성하고 결과를 통지한다.
flatMap-3 : 원본 타임라인에서 데이터를 받고 에러통지와 완료 통지 Flowable/Observable을 각각 생성하여 결과 데이터는 완료 시점에 대체 데이터를 통지하고 그 후에 완료를 통지한다.
flatMap은 변환결과가 null이거나 조건에 맞지 않은 데이터인 경우 또는 에러를 통지해야할 경우 Flowable/Observable을 반환하여 해당 데이터를 건너뛰거나 에러를 통지 할 수 있다.
메서드 arguments
flatMap(mapper)
mpaper는 원본 데이터를 받아서 어떤 작업을 수행하고 결과를 Flowable/Observable로 생성하여 데이터를 각각 통지하는 인터페이스이다. 데이터 반환 시 입력 데이터 한개로 여러 데이터를 통지할 수 있고, 비어있는 Flowable/Observable을 반환하면 데이터를 통지하지 않게 할 수 있고, 에러를 반환하면 에러를 통지하고 종료한다.
[Function 메서드 인터페이스]
new Function<String, Publisher<?>>() {
@Override
public Publisher<?> apply(String data) throws Exception {
return null;
}
}
[소스코드]
공백데이터를 제외하고 통지하는 코드
public class Main {
public static void main(String[] args) {
flatMapMapper();
}
static void flatMapMapper() {
Flowable.just("A","", "B", "", "C", "","D", "")
.flatMap(new Function<String, Publisher<?>>() {
@Override
public Publisher<?> apply(String data) throws Exception {
return data.equals("") ? Flowable.empty() : Flowable.just(data.toLowerCase());
}
}).subscribe(new PrintSubscriber<>());
}
}
[통지받은 데이터를 출력하는 Subscribe]
public class PrintSubscriber<T> extends DisposableSubscriber<T> {
private String label;
public PrintSubscriber() {
}
public PrintSubscriber(String label) {
this.label = label;
}
@Override
public void onNext(T data) {
String threadName = Thread.currentThread().getName();
String resultMsg = label == null ? (threadName + " : " + data) : (threadName + " : "+ label + " : "+ data);
System.out.println(resultMsg);
}
@Override
public void onError(Throwable throwable) {
String threadName = Thread.currentThread().getName();
String resultMsg = label == null ? (threadName + " : Exception=" + throwable) : (threadName + " : "+ label + " : "+ throwable);
System.out.println(resultMsg);
}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
String resultMsg = label == null ? (threadName + " : Complete") : (threadName + " : "+ label + " : Complete");
System.out.println(resultMsg);
}
}
[결과]
main : a
main : b
main : c
main : d
main : Complete
flatMap(mapper, combiner)
combiner는 원본 데이터와 mapper에서 생성한 Flowable/Observable의 데이터를 받아 새로운 통지 데이터를 생성하는 인터페이스이다. combiner의 반환 값이 최종적으로 통지하는 데이터이다.
[combiner 인터페이스]
new BiFunction<String, Object, Object>() {
@Override
public Object apply(String s, Object o) throws Exception {
return o;
}
}
[소스코드]
공백데이터를 제외하고 통지하는 코드를 원본 데이터와 변환한 데이터를 통지
public class Main {
public static void main(String[] args) {
flatMapCombiner();
}
static void flatMapCombiner() {
Flowable.just("A","", "B", "", "C", "","D", "")
.flatMap(new Function<String, Publisher<?>>() {
@Override
public Publisher<?> apply(String data) throws Exception {
return data.equals("") ? Flowable.empty() : Flowable.just(data.toLowerCase());
}
}, new BiFunction<String, Object, Object>() {
@Override
public Object apply(String src, Object newSrc) throws Exception {
return "[" + src + "]" + newSrc;
}
}).subscribe(new PrintSubscriber<>());
}
}
[결과]
main : [A]a
main : [B]b
main : [C]c
main : [D]d
main : Complete
flatMap(onNextMapper, onErrorMapper, onCompleteSupplier)
- onNextMaper : 받은 데이터로 새로운 Flowable/Observable을 생성하는 방법을 정의하는 인터페이스
- onErrorMapper : 에러를 통지할 때 처리하는 인터페이스
- onCompleteSupplier : 완료를 통지할 때 처리하는 인터페이스
[소스코드]
public class Main {
public static void main(String[] args) {
flatMapNext_ER_CLT();
}
static void flatMapNext_ER_CLT() {
Flowable.just(1, 2, 3, 4, 5, 0, 7)
.map(data -> 10 / data)
.flatMap(
//일반통지 onNextMaper
new Function<Integer, Publisher<?>>() {
@Override
public Publisher<?> apply(Integer data) throws Exception {
return Flowable.just(data);
}
},
//에러 발생시 처리 onErrorMapper
new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
return throwable instanceof ArithmeticException ? Flowable.just("-1") : Flowable.error(throwable);
}
},
// 완료통지 onCompleteSupplier
new Callable<Publisher<?>>() {
@Override
public Publisher<?> call() throws Exception {
return Flowable.just(100);
}
}).subscribe(new PrintSubscriber<>());
}
}
[결과]
main : 10
main : 5
main : 3
main : 2
main : 2
main : -1 //에러발생
main : Complete
'Language > Java' 카테고리의 다른 글
[RxJava] buffer (0) | 2023.04.26 |
---|---|
[RxJava] concatMap / concatMapDelayError (0) | 2023.04.26 |
[RxJava] map (0) | 2023.04.26 |
[RxJava] never (0) | 2023.04.26 |
[RxJava] empty (0) | 2023.04.26 |