Reactive Streams
라이브러리나 프레임워크에 상관없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘으로 인터페이스만 제공하고 구현은 각 라이브러리와 프레임워크에서 한다.
구성
데이터를 통지하는 Publisher(생산자)와 데이터를 받아 처리하는 Subscriber(소비자)로 구성된다.
Subscriber가 Publisher를 구독(Subscribe)하면 Publisher가 통지한 데이터를 Subscriber가 받을 수 있다.
- Publisher : 데이터를 통지하는 생산자
- Subscriber : 데이터를 받아 처리하는 소비자
Subscriber가 구독하고 Publisher가데이터를 통지하고 Subscriber가 데이터를 받을 때까지 처리과정의 흐름이다.
Subscriber는 Publisher를 구독하고, Publisher는 준비가 끝나면 Subscriber에게 통지한다(여기까지 onSubScribe). 통지받은 Sub는 Pub에게 받고자하는 데이터 개수를 요청한다. 만약 통지 받을 데이터 개수를 요청하지 않으면 Pub는 통지해야 할 데이터 개수 요청을 기다리므로 통지를 시작하지 않는다.
Pub는 통지받을 데이터 개수를 요청하면 데이터를 생성하고 다시 Sub에게 통지한다 (onNext 를 수행). 이 과정을 완료 또는 에러가 발생할 때가지 계속 수행되고, Sub가 완료를 요청하면 Pub가 완료를 통지한다(onComplete 라고 함). pub는 완료를 통지하면 더 이상 어느 요청도 하지 않는다. 그리고 에러가 발생하면 Sub에 발생한 객체와 함께 에러(onError)를 통지한다.
RxJava를 사용하려면 dependencies 라이브러리가 있어야한다.
//RxJava 2.x
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.10'
//RxJava 1.x
compile group: 'io.reactivex', name: 'rxjava', version: '1.3.8'
//RxJava streams
compile group: 'io.reactivex', name: 'rxjava-reactive-streams', version: '1.2.1'
Reactive Streams가 제공하는 함수
function | description |
---|---|
onSubscribe | 데이터 통지가 준비됐음을 통지 |
onNext | 데이터 통지 |
onError | 에러를 통지 |
onComplete | 완료를 통지 |
인터페이스
해당인터페이스는 rxjava-reactive-streams 패키지에 담겨있다.
interface | description |
---|---|
Publisher | 데이터를 생성하고 통지하는 인터페이스 |
Subscriber | 통지된 데이터를 전달받아 처리하는 인터페이스 |
Subscription | 데이터 개수를 요청하고 구독을 해지하는 인터페이스 |
Processor | Pub/Sub의 기능을 모두 가지고 있는 인터페이스 |
데이터를 통지하는 생산자
[Publisher.java]
public interface Publisher<T> {
//데이터를 받는 Subscribe 등록
public void subscribe(Subscriber<? super T> s);
}
데이터를 받아 처리하는 소비자
[Subscriber.java]
public interface Subscriber<T> {
//구독 시작시 처리
public void onSubscribe(Subscription s);
//데이터 통지시 처리
public void onNext(T t);
//에러 통지시 처리
public void onError(Throwable t);
//완료 통지시 처리
public void onComplete();
}
생산자와 소비자를 연결하는 인터페이스
[Subscription.java]
public interface Subscription {
//통지받을 데이터 개수 결정
public void request(long n);
//구독 해지
public void cancel();
}
Publisher와 Subscriber의 기능을 모두 가지고 있는 인터페이스
[Processor.java]
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
스트림의 생산과 소비
Observable은 구독자가 구독하는 즉시 구독자에게 이벤트를 전파하는 역할을 한다.
Observable<String> observable = Observable.create(sub -> {
sub.onNext("Hello, reactive world!"); //구독자에게 보낼 메시지를 입력
sub.onComplete(); // 스트림의 끝을 구독자에게 알림
});
RxJava 1.2.7부터 Observable을 생성하는 방식을 더 이상 사용하지 않는다. 이유는 생성할 것들이 너무 많고 구독자에게 부하를 줄 수 있다.
subscriber를 구현할 때는 Observer 메서드를 구현해야한다.
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
System.err.println(e);
}
@Override
public void onComplete() {
System.out.println("done");
}
};
observable 인스턴스와 observer와 연결한다.
observable.subscribe(observer);
그럼 다음과 같은 결과가나옴
Hello, reactive world!
done
Reactive Streams 규칙
- 구독 시작 통지(onSubscribe)는 해당 구독에서 한번만 발생
- 통지는 순차적으로 이루어짐
- null은 통지하지 않는다. null을 통지하면 NullPointerException이 발생
- Publisher의 처리는 완료(onComplete) 또는 에러(onError)를 통지해 종료한다. Publisher는 처리가 끝마친 것으로 판단하고 더 이상 통지하지 않겠다는 의미이다.
- 데이터 통지는 순차적으로 이루어진다. 즉, 여러 통지를 동시에 할 수 없다. (Observable 규약때문으로 데이터 불일치를 방지)
- 처리가 종료되고 다시 같은 Publisher와 Subscriber로 subscribe 메서드를 호출하면 다시 onSubScribe메서드가 호출된다. 이유는 구독이 종료되고 다시 새로운 구독이 시작된다고 인식하기 때문이다.해당 인스턴스를 다시사용하여 subscribe를 호출하여 내부를 초기화지 않으면 의도하지 않을 결과가 발생할 수 도 있다.
- 데이터 개수 요청이나 구독 해지를 수행하는 onSubScription은 다음의 규칙을 가지고 있다.
- 데이터 개수 요청에 Long.MAX_VALUE를 설정하면 데이터 개수의 의한 통지 제한은 없어진다.
- Subscription의 메서드는 동기화된 상태로 호출해야 한다.
'Language > Java' 카테고리의 다른 글
[Reactive Programming] RxJava 비동기 처리 (0) | 2023.04.26 |
---|---|
[Reactive Programming] Marble Diagram(마블 다이어그램) (0) | 2023.04.26 |
[Reactive Programming] RxJava 개념 (0) | 2023.04.26 |
[Reactive Programming] Reactive System(리액티브 시스템) (0) | 2023.04.26 |
[Java] Lamda Expressions(람다식) (0) | 2023.04.20 |