RxJava 개념
RxJava는 Reactive Progrmming을 구현하는데 사용하는 라이브러리 이다.
에릭 마이어가 개발한 .NET 프레임워크의 실험적 라이브러리인 Reactive Extensions(Rx) 를 2009년 마이크로소프트에서 공개하고 2013년 넷플릭스에서 자바로 이식한 것이 RxJava의 시작이다.
현재 Reactive Extensions를 다루는 라이브러리는 ReactiveX라는 오픈 소스 프로젝트로 바뀌어 자바와 .NET 뿐만 아니라 자바스크립트, 스위프트, 등 여러 프로그램언어를 지원하는 라이브러리를 제공한다.
Reactive Extensions는 동기식 또는 비동기식 스트림과 관계 없이 명령형 언어를 이용해 데이터 스트림을 조작할 수 있는 일련의 도구이다.
ReactiveX는 Observer Pattern, Iterator Pattern 및 함수형 프로그램의 조합으로 정의된다.
ReactiveX : http://reactivex.io
특징
- RxJava는 Observer Pattern을 활용하였다. Observer 패턴은 감시 대상 객체의 상태가 변화면 이를 관찰하는 객체에 알려주는 구조이다. 그래서 데이터를 생성하는 측과 데이터를 소비하는 측으로 나눌 수 있고 쉽게 데이터 스트림을 처리할 수 있다.
- 비동기 처리를 할 수 있다. Reactive Streams 규칙의 근간인 Observable 규약이라는 RxJava 개발 가이드라인을 따르면 직접 스레드를 관리하는 번거로움에서 해방되고 구현도 간단하게 할 수 있다.
기본구조
RxJava는 데이터를 만들고 통지하는 생산자(Publisher)와 통지된 데이터를 받아 처리하는 소비자(Subscriber)로 구성된다. 소비자는 생산자를 구독하여 생산자가 통지한 데이터를 소비자가 받아서 처리한다.
크게 두 가지 로 나누는데, Reactive Streams을 지원하는 Flowable과 Subscriber가 있고, Reactive Streams을 지원하지 않는 Observable과 Observer가 있다.
구분 | 생산자 | 소비자 |
---|---|---|
Reactive Streams 지원 | Flowable | Subscriber |
Reactive Streams 미지원 | Observable | Observer |
Flowable로 구독시작(onSubscribe)을 하면 데이터 통지(onNext), 에러통지 (onError), 완료통지(onComplete)를 수행하고 통지받은 시점의 소비자인 Subscriber로 처리하고 데이터 개수 요청 및 구독 해지를 할 수 있다. (RxJava 2.x 버전은 Reactive Streams을 사용하지 않는다.)
Observable과 Observer Flowable과 onSubScriber와 같은 기능을 수행하지만 통지하는 데이터 개수를 제어하는 배압 기능이 없기 때문에 데이터 개수를 요청하지 않는다. 그래서 onSubScription을 사용하지 않고 Disposable이라는 구독 해지 메서드가 있는 인터페이스를 사용한다. Disposable은 구독 시작 시점에 onSubscribe 메서드의 인자로 Observer에게 전달된다.
Disposable는 구독 해지를 위한 두 가지 메서드가 있다.
method | descrption |
---|---|
dispose | 구독을 해지한다. |
isDisposed | 구독을 하지하면 true, 하지않으면 false를 반환한다 |
그러므로 Observable과 Observer은 데이터 개수 요청을 하지 않고 데이터가 생성되자마자 Observer에게 통지된다.
연산자
RxJava는 생산자가 통지한 데이터가 소비자에게 도착하기 전에 불필요한 데이터를 삭제하거나 소비자가 사용하기 쉽게 변경해야 할 때가 있는데 이 때 Flowable/Observable의 메서드에서 새로운 Flowable/Observable을 반환하며 해당 메서드를 서로 연결해나가며 최동 데이터를 통지하는 Flowable/Observable을 생성한다. 그래서 통지하는 데이터를 생성하거나 필터링 또는 변환하는 메서드이다.
연산자를 사용하여 데이터를 출력하는 예제
[Subscription.java]
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
public class Subscription {
public static void main(String[] args) {
evenRamda();
evenNotRamda();
}
private static void evenRamda() {
Flowable<Integer> flowable = Flowable.just(1,2,3,4,5,6,7,8,9,10) //인자의 데이터를 순서대로 통지하는 Flowable 생성
.filter(data -> data % 2 == 0) //짝수 데이터만 통지
.map(data -> data * 100); //데이터를 100배로 변환
//구독하여 받은 데이터를 출력
flowable.subscribe(data-> System.out.println("Data : " + data));
}
private static void evenNotRamda() {
Flowable<Integer> flowable = Flowable.just(1,2,3,4,5,6,7,8,9,10) //인자의 데이터를 순서대로 통지하는 Flowable 생성
//짝수 데이터만 통지
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer src) throws Exception {
return src % 2 == 0;
}
})
//데이터를 100배로 변환
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer data) throws Exception {
return data * 100;
}
});
//구독하여 받은 데이터를 출력
flowable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer data) throws Exception {
System.out.println("Data : " + data);
}
});
}
}
비동기 처리
RxJava는 개발자가 직접 스레드를 관리하지 않게 각 처리 목적에 맞춰 스레드를 관리하는 스케줄러(Scheduler)를 제공하며 이 스케줄러를 이용하면 어떤 스레드에서 무엇을 처리할지 제어할 수 있다.
스케줄러는 데이터를 생성해 통지(flowable/Observable)하는 부분과 데이터를 받아 처리하는 부분을 설정 할 수 있다. 이후 데이터의 필터나 변환을 하는 SubScriber/Observer가 데이터 수신 처리를 어느 스케줄러에서 처리 할 지를 제어한다.
비동기 생성
//1초마다 0부터 시작하는 값을 비동기로 통지하는 interval 생성
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(e -> { //결과 통지 출력
System.out.println("Received : " + e);
});
Thread.sleep(5000);
결과
Received : 0
Received : 1
Received : 2
Received : 3
Received : 4
Thread.sleep(5000);을 지우면 아무것도 출력하지 않고 종료하게 되는데 이벤트가 생성되는 것과 별개의 스레드에서 사용되기 때문이다.
주의할 점
- 데이터를 통지하는 측과 받는 측은 데이터 통지 시에만 데이터를 주고 받아야 하고, 그 외 요인으로 서로의 행동에 영향을 주지 않아야 함
- 비동기로 처리할 때는 생산자에서 소비자 까지의 처리가 노출되지 않도록 폐쇠(closed)적으로 개발하면 외부요인의 참조를 줄일 수 있다.
- 외부에서 데이터를 반영하는 것은 소비자(SubScriber/Observer)에서 한다.
import io.reactivex.Flowable;
import io.reactivex.functions.BiFunction;
import java.util.concurrent.TimeUnit;
public class AsyncEff {
private enum State {
ADD, MULTIPLY
}
//계산 방법
private static State calcMethod;
public static void main(String[] args) throws Exception {
operatorRamda();
operatorNotRamda();
Thread.sleep(1000);
System.out.println("계산방법 변경");
calcMethod = State.MULTIPLY;
Thread.sleep(2000);
}
private static void operatorRamda() {
calcMethod = State.ADD;
Flowable<Long> flowable =
Flowable.interval(300L, TimeUnit.MILLISECONDS) //300밀리초마다 데이터를 통지한다.
.take(7) //7건까지 통지
.scan((sum, data) -> (calcMethod == State.ADD) ? sum + data : sum * data); //데이터 계산
flowable.subscribe(data -> System.out.println("Data : " + data));
}
private static void operatorNotRamda() {
calcMethod = State.ADD;
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS) //300밀리초마다 데이터를 통지한다.
.take(7) //7건까지 통지
.scan(new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long sum, Long data) throws Exception {
return (calcMethod == State.ADD) ? sum + data : sum * data; //데이터 계산
}
});
flowable.subscribe(data -> System.out.println("Data : " + data));
}
}
결과
Data : 0
Data : 0
Data : 1
Data : 1
Data : 3
Data : 3
계산방법 변경
Data : 9
Data : 9
Data : 36
Data : 36
Data : 180
Data : 180
Data : 1080
Data : 1080
Cold Constructor와 Hot Constructor
[Cold Constructor]
1개의 소비자와 구독관계를 맺는 것으로 통지하는 데이터의 타임라인을 구독할 때마다 생성한다. 그래서 생산자를 구독하면 생산자 처리가 시작된다.(RxJava는 기본적으로 Cold Constructor)
[Hot Constructor]
여러 소비자와 구독관계를 맺는 것으로 이미 생성 통지한 데이터의 타임라인에 나중에 소비자가 참가하는 것을 허용한다. 생산자를 구독해도 생산잔 처리가 되지 않을 수 있다. 또한 이미 처리를 시작한 생산자를 구독하면 구독한 시점부터 데이터를 받게되고, 같은 데이터를 여러번 소비자가 받을 수도 있다.
ConnectableFlowable/ConnectableObservable 클래스
ConnectableFlowable/ConnectableObservable는 Hot Flowable/Observable이고 여러 Subscriber/Observer에서 동시에 구독할 수 있다. 또한 subscribe메서드를 호출해도 처리를 시작하지 않고 connect메서드를 호출해야 처리를 한다.
- subscriber/Observer에서 구독(connect 메서드 호출 전까지 처리되지 않음)
- connect메서드를 호출 할 때 동시에 여러 구독자에게 데이터를 통지
[Flowable/Observable로 변환하는 메서드]
- refcount() : 새로운 Flowable/Observable을 생성
- autoConnect() / autoConnect(int numberOfSubscribers) : 지정한 개수의 구독이 시작된 시점에 처리를 시작하는 Flowable/Observable을 생성
[Flowable/Observable을 Cold에서 Hot으로 변환하는 연산자]
- publish() : Flowable/Observable에서 ConnectableFlowable/ConnectableObservable을 생성하는 연산자이다. 해당 클래스를 이용하면 처리를 시작한 뒤 구독하면 구독한 이후 생성된 데이터부터 새로운 소비자에게 통지한다.
- replay() / replay(int bufferSize) / replay(long time, TimeUnit unit) : ConnectableFlowable/ConnectableObservable를 생성하는 연산자로 통지한 데이터를 캐시하고, 처리르 시작한 후 구독하면 캐시된 데이터를 먼저 새로 구독한 소비자에게 통지하며 그 후에 모든 소비자에게 같은 데이터를 통지한다. 그리고 메서드가 없으면 모든 데이터를 캐시하고 인자가 있으면 지정한 시간동안 정한 개수만큼 데이터를 캐시한다.
- share() : 여러소비자가 구독할 수 있는 Flowable/Observable을 생성한다. 다른메서드와 달리 ConnectableFlowable/ConnectableObservable를 생성하지 않고, Flowable/Observable을 구독하는 소비자가 있는 동안 중간에 새로 구독해도 같은 타임라인에서 생성되는 데이터를 통지한다.
Flowable vs Observable
RxJava의 Wiki에서 MissingBackpressureException과 OutOfMemoryError을 피하는 방법으로 제안한 기준
- Flowable
- 대량 데이터(예를 들어 10,000건 이상)를 처리할 때
- 네트워크 통신이나 데이터베이스 등의 I/O를 처리할 때
- Observable
- GUI 이벤트
- 소량의데이터(ex. 1000건 이하)를 처리할 때
- 데이터 처리가 기본적으로 동기 방식이며, 자바의 표준 Stream 대신 사용할 때
일반적으로 Observable이 Flowable보다 오버헤드가 적다고 알려져 있다.
참조
'Language > Java' 카테고리의 다른 글
[Reactive Programming] Marble Diagram(마블 다이어그램) (0) | 2023.04.26 |
---|---|
[Reactive Programming] Reactive Streams (0) | 2023.04.26 |
[Reactive Programming] Reactive System(리액티브 시스템) (0) | 2023.04.26 |
[Java] Lamda Expressions(람다식) (0) | 2023.04.20 |
[Java] Generic(제네릭) (0) | 2023.04.20 |