거꾸로 바라본 세상
article thumbnail
반응형

배압(Backpressure)

생산자가 1초 간격으로 100건의 데이터를 통지하는데 소비자는 1초마다 한 건의 데이터를 처리할 수 있다면 1초뒤에 처리를 기다리는 데이터가 99건 쌓이고 그 후엔 더 많은 새로운 데이터가 처리를 기다리게 된다. 이럴경우 시간이 지나면서 데이터가 쌓이고 최신 정보를 받을 수 없고, 결국 메모리가 부족해서 시스템이 다운할 가능성이 있다.

그 문제점을 해결해 주는 것이 배압(Backpressure) 이다.

배압(Backpressure) 은 데이터 통지량을 제어하는 기능을 말하는 것으로 Flowable에만 제공한다. 배압은 데이터를 받은 측이 서로 다른 스레드에서 처리하는데 Flowable과이 데이터를 통지하는 속도가 데이를 받는 측의 처리속도 보다 빠를 때 사용된다.

처리과정

  1. 데이터 수신측이 지정한 개수만큼 데이터를 통지 Flowable 요청
  2. Flowable은 요청받은 수 만큼 데이터 통지
  3. 요청 수만 큼 데이터 통지 후엔 통지를 멈춘다. (통지는 멈췄지만 데이터는 생산해낸다)
  4. 데이터를 수신 측이 모든 데이터를 처리하면 다시 데이터를 요청
  5. Flowable은 요청한 수만큼 데이터를 다시 통지

Flowable이 통지를 기다리는 데이터를 어떻게 다룰 것인지는 BackpressureStrategy 에서 설정하고 다음과 같은 역할을 한다.

  • 모든 데이터는 버퍼에 쌓는다.
  • 통지 대기 데이터는 모두 파기한다.
  • 마지막에 통지한 데이터만 버퍼에 쌓는다.
  • 지정 수만큼 버퍼에 쌓고 초과하면 에러로 처리한다.

request method

request 메소드는 Subscriber가 처리할 수 있는 만큼 데이터 개수를 Flowable에 요청하고 기다린다. 그리고 SubScriber는 데이터를 받아 처리하고 요청한 만큼 데이터 처리가 완료되면 onNext 메서드로 다시 데이터 요청 수만큼 요청 반복한다. Flowable 속도는 수신 측의 속도에 맞춰 통지

[배압의 request 요청]

데이터를 통지하는 측과 수신측의 처리속도의 균형이 맞지 않으면 MissingBackpressure Exception 발생한다.

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.TimeUnit;

public class MissingBackpressureEx {
    public static void main(String[] args) throws Exception {
        Flowable<Long> flowable = Flowable.interval(10L, TimeUnit.MICROSECONDS)
                .doOnNext(value -> System.out.println("Emit : " + value)); //통지시 정보 출력

        flowable.observeOn(Schedulers.computation())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long data) {
                        try {
                            System.out.println("Waiting..........");
                            Thread.sleep(1000L);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println("Received : " + data);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("Error : " + t);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("Complete");
                    }
                });
        Thread.sleep(5000L);
    }
}

[결과]


Emit : 0
Emit : 1
...
...
Emit : 13
Emit : 14
Waiting..........
Emit : 15
Emit : 16
...
...

Emit : 124
Emit : 125
Emit : 126
Emit : 127
Received : 0
Error : io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
BackpressureStrategy

Flowable이 어떤 방법으로 통지를 기다리는 데이터를 다룰지 설정한 배압의 종류이다.

create 메서드에서 Flowable을 생성할 때 두번 째 인자로 BackpressureStrategy를 지정해 생성한 Flowable이 통지 대기중인 데이터를 다루는 방법을 설정

[BackpressureStrategy 종류]

종류 설명
BUFFER 데이터를 통지할 수 있게 될 때까지 모든 데이터르 버퍼에 쌓는다
DROP 데이터를 통지할 수 있게 될 때까지 새로 생성한 모든 데이터를 파기
LATEST 가장 최신 데이터만 버퍼에 담고 새로 데이터가 생성되면 버퍼에 담긴 데이터를 최신 데이터로 교체
ERROR 데이터 버퍼 크기를 넘으면 오류 통지
NONE 특정 처리 작업을 실행하지 않는다. onBackpressure 메서드를 수행할대 사용
반응형
profile

거꾸로 바라본 세상

@란지에。

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!