3. fromCallable fromCallable은 java.util.concurrent.Callable 함수형 인터페이스에서 생성한 데이터를 통지하는 연산자로 인자로 지정한 Callable의 반환 값을 데이터로 탐지하고, 작업을 마치면 완료 통지 Flowable/Observable을 생성. 통지하는 데이터는 호출될 때마다 새로 생성. [코드] public class Main { public static void main(String[] args) { formCallble(); } static void formCallble() { Flowable flowable = Flowable.fromCallable(() -> System.currentTimeMillis()); flowable.subscribe(n..
Reactive Programming
2. fromArray/fromIterable fromArray 인자로 지정한 Array를 순서대로 통지 Flowable/Observable을 생성 fromIterable 인자로 지정한 Iterable을 순서대로 통지 Flowable/Observable을 생성 [소스코드] import io.reactivex.Flowable; public class Main { public static void main(String[] args) { fromArray(); fromIterable(); } static void fromArray() { //fromArray System.out.println("Ex 1)"); String[] arrays = new String[]{"A", "B", "C", "D", "E"};..
1. just() 인자(arguments)의 데이터를 통지하는 Flowable/Observable 생성하는 연산자로 최대 10개 까지 정할 수 있고, 모든 데이터를 통지하면 완료(onComplete)를 통지Flowable/Observable을 생성한다. [소스코드] import io.reactivex.Flowable; public class Main { public static void main(String[] args) { just(); } static void just() { Flowable flowable = Flowable.just("A", "B", "C", "D", "E"); flowable.subscribe(new PrintSubscriber()); } } [Subscribe 코드] impor..
배압(Backpressure) 생산자가 1초 간격으로 100건의 데이터를 통지하는데 소비자는 1초마다 한 건의 데이터를 처리할 수 있다면 1초뒤에 처리를 기다리는 데이터가 99건 쌓이고 그 후엔 더 많은 새로운 데이터가 처리를 기다리게 된다. 이럴경우 시간이 지나면서 데이터가 쌓이고 최신 정보를 받을 수 없고, 결국 메모리가 부족해서 시스템이 다운할 가능성이 있다. 그 문제점을 해결해 주는 것이 배압(Backpressure) 이다. 배압(Backpressure) 은 데이터 통지량을 제어하는 기능을 말하는 것으로 Flowable에만 제공한다. 배압은 데이터를 받은 측이 서로 다른 스레드에서 처리하는데 Flowable과이 데이터를 통지하는 속도가 데이를 받는 측의 처리속도 보다 빠를 때 사용된다. 처리과정 ..
예외 처리 에러가 발생했을 때 대응하는 방법 1. 소비자(Subscriber/Observer)에게 에러를 통지하기 통지 처리중 에러가 발생하면 소비자에게 에러를 통지해 소비자가 에러에 대응하는 메커니즘을 제공하고, 처리 중 에러가 발생해도 중단을 하기보단 소비자에게 발생한 에러를 통지한다. subscribe(onNext) 메서드로 구독할 때 에러가 발생해도 stackTrace만 출력할 뿐 별도 에러 처리를하지 않는다. 그러므로 아무런 에러 처리 없이 구독 후 처리 작업을 진행하게되므로 주의가 필요하다. 2. 처리 작업 재시도(retry) 에러가 발생할 경우 생산자의 처리 작업을 처음부터 다시 시도하고, 소비자에게 에러를 통지하진 않는다. 그래서 네트워크가 순간적으로 중단돼 처리 작업이 실패해도 다시 실행..
비동기 처리 비동기 처리는 어던 작업을 실행하는 동안 해당 처리가 끝나기를 기다리지 않고 다른 작업을 수행할 수 있는 것을 말한다. RxJava는 비동기 처리를 수행하는데 필요한 API를 제공하고 기존에 구축한 비지니스 로직에 영향을 주지 않고 생산자 또는 소비자의 작업을 비동기로 처리하도록 교체할 수 있다. 또한, 용도별로 적절하게 스레드를 관리하는 클래스를 제공하여 직접 스레드를 관리하는 번거로움이 줄었다. RxJava 비동기 처리 RxJava에서 개발자가 직접 비동기 처리를 하도록 설정하거나 연산자 내에서 시간을 다루는 작업을 하지 않는 한, 생산자의 처리 작업을 실행하는 스레드에서 각 연산자의 처리 작업과 소비자의 처리 작업이 실행된다. 개발자가 직접 비동기 처리를 하도록 설정하면 생산자와 연산자..
Marble Diagram(마블 다이어그램) ReactiveX의 문서를 읽다보면 원형 또는 사각형 등의 도형과 화살표가 들어간 그림을 말한다. 주로 Flowable/Observable에서 메서드를 호출할 때 시간경과에 따른 데이터가 어떻게 전달되고 변화되는지를 표현한다. [Filter Diagram] filter 메서드는 원본 데이터 중 지정한 조건에 맞는 데이터만 통지하는 메서드로 Flowable과 Observable에 있는 메서드로 둘 다 같은 처리를 수행한다. filter 메서드에서 인자를 받아 데이터를 받을건지 제외할 것인지 판단하는 함수형 인터페이스를 사용하여 결과가 true 인 데이터만 통지를 한다. [에러가 발생할 경우] 처리가 정상적으로 끝나지 못하고 중간에 에러를 통지하게 되면 해당 위치..
Reactive Streams 라이브러리나 프레임워크에 상관없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘으로 인터페이스만 제공하고 구현은 각 라이브러리와 프레임워크에서 한다. 구성 데이터를 통지하는 Publisher(생산자)와 데이터를 받아 처리하는 Subscriber(소비자)로 구성된다. Subscriber가 Publisher를 구독(Subscribe)하면 Publisher가 통지한 데이터를 Subscriber가 받을 수 있다. Publisher : 데이터를 통지하는 생산자 Subscriber : 데이터를 받아 처리하는 소비자 Subscriber가 구독하고 Publisher가데이터를 통지하고 Subscriber가 데이터를 받을 때까지 처리과정의 흐름이다. Subscriber는 Publish..
RxJava 개념 RxJava는 Reactive Progrmming을 구현하는데 사용하는 라이브러리 이다. 에릭 마이어가 개발한 .NET 프레임워크의 실험적 라이브러리인 Reactive Extensions(Rx) 를 2009년 마이크로소프트에서 공개하고 2013년 넷플릭스에서 자바로 이식한 것이 RxJava의 시작이다. 현재 Reactive Extensions를 다루는 라이브러리는 ReactiveX라는 오픈 소스 프로젝트로 바뀌어 자바와 .NET 뿐만 아니라 자바스크립트, 스위프트, 등 여러 프로그램언어를 지원하는 라이브러리를 제공한다. Reactive Extensions는 동기식 또는 비동기식 스트림과 관계 없이 명령형 언어를 이용해 데이터 스트림을 조작할 수 있는 일련의 도구이다. ReactiveX는..