배압(Backpressure)
생산자가 1초 간격으로 100건의 데이터를 통지하는데 소비자는 1초마다 한 건의 데이터를 처리할 수 있다면 1초뒤에 처리를 기다리는 데이터가 99건 쌓이고 그 후엔 더 많은 새로운 데이터가 처리를 기다리게 된다. 이럴경우 시간이 지나면서 데이터가 쌓이고 최신 정보를 받을 수 없고, 결국 메모리가 부족해서 시스템이 다운할 가능성이 있다.
그 문제점을 해결해 주는 것이 배압(Backpressure) 이다.
배압(Backpressure) 은 데이터 통지량을 제어하는 기능을 말하는 것으로 Flowable에만 제공한다. 배압은 데이터를 받은 측이 서로 다른 스레드에서 처리하는데 Flowable과이 데이터를 통지하는 속도가 데이를 받는 측의 처리속도 보다 빠를 때 사용된다.
처리과정
- 데이터 수신측이 지정한 개수만큼 데이터를 통지 Flowable 요청
- Flowable은 요청받은 수 만큼 데이터 통지
- 요청 수만 큼 데이터 통지 후엔 통지를 멈춘다. (통지는 멈췄지만 데이터는 생산해낸다)
- 데이터를 수신 측이 모든 데이터를 처리하면 다시 데이터를 요청
- Flowable은 요청한 수만큼 데이터를 다시 통지
Flowable이 통지를 기다리는 데이터를 어떻게 다룰 것인지는 BackpressureStrategy 에서 설정하고 다음과 같은 역할을 한다.
- 모든 데이터는 버퍼에 쌓는다.
- 통지 대기 데이터는 모두 파기한다.
- 마지막에 통지한 데이터만 버퍼에 쌓는다.
- 지정 수만큼 버퍼에 쌓고 초과하면 에러로 처리한다.
request method
request 메소드는 Subscriber가 처리할 수 있는 만큼 데이터 개수를 Flowable에 요청하고 기다린다. 그리고 SubScriber는 데이터를 받아 처리하고 요청한 만큼 데이터 처리가 완료되면 onNext 메서드로 다시 데이터 요청 수만큼 요청 반복한다. Flowable 속도는 수신 측의 속도에 맞춰 통지
[배압의 request 요청]
데이터를 통지하는 측과 수신측의 처리속도의 균형이 맞지 않으면 MissingBackpressure Exception 발생한다.
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.TimeUnit;
public class MissingBackpressureEx {
public static void main(String[] args) throws Exception {
Flowable<Long> flowable = Flowable.interval(10L, TimeUnit.MICROSECONDS)
.doOnNext(value -> System.out.println("Emit : " + value)); //통지시 정보 출력
flowable.observeOn(Schedulers.computation())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long data) {
try {
System.out.println("Waiting..........");
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Received : " + data);
}
@Override
public void onError(Throwable t) {
System.out.println("Error : " + t);
}
@Override
public void onComplete() {
System.out.println("Complete");
}
});
Thread.sleep(5000L);
}
}
[결과]
Emit : 0
Emit : 1
...
...
Emit : 13
Emit : 14
Waiting..........
Emit : 15
Emit : 16
...
...
Emit : 124
Emit : 125
Emit : 126
Emit : 127
Received : 0
Error : io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
BackpressureStrategy
Flowable이 어떤 방법으로 통지를 기다리는 데이터를 다룰지 설정한 배압의 종류이다.
create 메서드에서 Flowable을 생성할 때 두번 째 인자로 BackpressureStrategy를 지정해 생성한 Flowable이 통지 대기중인 데이터를 다루는 방법을 설정
[BackpressureStrategy 종류]
종류 | 설명 |
---|---|
BUFFER | 데이터를 통지할 수 있게 될 때까지 모든 데이터르 버퍼에 쌓는다 |
DROP | 데이터를 통지할 수 있게 될 때까지 새로 생성한 모든 데이터를 파기 |
LATEST | 가장 최신 데이터만 버퍼에 담고 새로 데이터가 생성되면 버퍼에 담긴 데이터를 최신 데이터로 교체 |
ERROR | 데이터 버퍼 크기를 넘으면 오류 통지 |
NONE | 특정 처리 작업을 실행하지 않는다. onBackpressure 메서드를 수행할대 사용 |
'Language > Java' 카테고리의 다른 글
[RxJava] fromArray, fromIterable (0) | 2023.04.26 |
---|---|
[RxJava] Just (0) | 2023.04.26 |
[Reactive Programming] RxJava 예외 처리 (0) | 2023.04.26 |
[Reactive Programming] RxJava 비동기 처리 (0) | 2023.04.26 |
[Reactive Programming] Marble Diagram(마블 다이어그램) (0) | 2023.04.26 |