empty 빈 Flowable/Observable을 생성하는 연산자로, 처리를 시작하면 바로 onComplete를 통지한다. empty 메서드는 단독으로 거의 사용하지 않고 flatMap 메서드의 데이터가 null일 때 이를 대신해 empty 메서드를 생성하여 데이터를 통지 대상에게 재외하는 작업을 할 수 있다. public class Main { public static void main(String[] args) throws Exception { empty(); } static void empty() throws Exception { Flowable.empty().subscribe(new PrintSubscriber()); } } [Subscribe 코드] import io.reactivex.subs..
Language
defer 구독될 때마다 새로운 Flowable/Observable을 생성하는 연산자이다. just와는 다르게 선언한 시점의 데이터를 통지하는 것이 아니라 호출 시점에 데이터 생성이 필요할 때 사용한다. [소스코드] public class Main { public static void main(String[] args) throws Exception { defer(); justTime(); } static void defer() throws Exception { System.out.println("defer() locaTime"); Flowable flowable = Flowable.defer(() -> Flowable.just(LocalTime.now())); flowable.subscribe(new ..
timer 호출 시점부터 지정시간 동안 대기한 후 long type 숫자 0 하나만 통지하고 종료하는 Flowable/Observable을 생성하는 연산자이다. timer 메서드는 기본적으로 Schedulers.computation() 스케줄러의해 실행된다. [소스코드] import io.reactivex.Flowable; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { tiner(); } static void tiner() throws Exception { Fl..
interval 지정한 통지 간격(interval)마다 0부터 시작하는 long 타입의 숫자를 데이터로 통지하는 Flowable/Observable을 생성하는 연산자이다. interval 메서드는 별도 설정이 없으면 Schedulers.computation()의 스케줄러에 의해 실행된다. 만약 스케줄러를 변경하려면 인자로 스케줄러를 받는 메서드를 이용한다. 또한 interval 메서드로 생성한 Flowable/Observable은 완료 통지가 필요한 경우 take 메서드를 통해 통지할 데이터 개수를 제한하는 작업을 해야한다. [소스코드] public class Main { public static void main(String[] args) throws Exception { interval(); } st..
range, rangeLong range 메서드는 지정한 시작 값부터 지정한 개수만큼 하나씩 순서대로 증가하는 Integer 값을 통지 Flowable/Observable을 생성 연산자이고 rangeLong은 Long값을 통지 Flowable/Observable을 생성 연산자이다. ex) 시작 값이 3이고 통지하려는 데이터 수는 5이면 '3,4,5,6,7' 을 통지한다. public class Main { public static void main(String[] args) { range(); rangeLong(); } static void range() { System.out.println("range"); Flowable flowable = Flowable.range(3, 5); flo..
3. fromCallable fromCallable은 java.util.concurrent.Callable 함수형 인터페이스에서 생성한 데이터를 통지하는 연산자로 인자로 지정한 Callable의 반환 값을 데이터로 탐지하고, 작업을 마치면 완료 통지 Flowable/Observable을 생성. 통지하는 데이터는 호출될 때마다 새로 생성. [코드] public class Main { public static void main(String[] args) { formCallble(); } static void formCallble() { Flowable flowable = Flowable.fromCallable(() -> System.currentTimeMillis()); flowable.subscribe(n..
2. fromArray/fromIterable fromArray 인자로 지정한 Array를 순서대로 통지 Flowable/Observable을 생성 fromIterable 인자로 지정한 Iterable을 순서대로 통지 Flowable/Observable을 생성 [소스코드] import io.reactivex.Flowable; public class Main { public static void main(String[] args) { fromArray(); fromIterable(); } static void fromArray() { //fromArray System.out.println("Ex 1)"); String[] arrays = new String[]{"A", "B", "C", "D", "E"};..
1. just() 인자(arguments)의 데이터를 통지하는 Flowable/Observable 생성하는 연산자로 최대 10개 까지 정할 수 있고, 모든 데이터를 통지하면 완료(onComplete)를 통지Flowable/Observable을 생성한다. [소스코드] import io.reactivex.Flowable; public class Main { public static void main(String[] args) { just(); } static void just() { Flowable flowable = Flowable.just("A", "B", "C", "D", "E"); flowable.subscribe(new PrintSubscriber()); } } [Subscribe 코드] impor..
배압(Backpressure) 생산자가 1초 간격으로 100건의 데이터를 통지하는데 소비자는 1초마다 한 건의 데이터를 처리할 수 있다면 1초뒤에 처리를 기다리는 데이터가 99건 쌓이고 그 후엔 더 많은 새로운 데이터가 처리를 기다리게 된다. 이럴경우 시간이 지나면서 데이터가 쌓이고 최신 정보를 받을 수 없고, 결국 메모리가 부족해서 시스템이 다운할 가능성이 있다. 그 문제점을 해결해 주는 것이 배압(Backpressure) 이다. 배압(Backpressure) 은 데이터 통지량을 제어하는 기능을 말하는 것으로 Flowable에만 제공한다. 배압은 데이터를 받은 측이 서로 다른 스레드에서 처리하는데 Flowable과이 데이터를 통지하는 속도가 데이를 받는 측의 처리속도 보다 빠를 때 사용된다. 처리과정 ..
예외 처리 에러가 발생했을 때 대응하는 방법 1. 소비자(Subscriber/Observer)에게 에러를 통지하기 통지 처리중 에러가 발생하면 소비자에게 에러를 통지해 소비자가 에러에 대응하는 메커니즘을 제공하고, 처리 중 에러가 발생해도 중단을 하기보단 소비자에게 발생한 에러를 통지한다. subscribe(onNext) 메서드로 구독할 때 에러가 발생해도 stackTrace만 출력할 뿐 별도 에러 처리를하지 않는다. 그러므로 아무런 에러 처리 없이 구독 후 처리 작업을 진행하게되므로 주의가 필요하다. 2. 처리 작업 재시도(retry) 에러가 발생할 경우 생산자의 처리 작업을 처음부터 다시 시도하고, 소비자에게 에러를 통지하진 않는다. 그래서 네트워크가 순간적으로 중단돼 처리 작업이 실패해도 다시 실행..