반응형
interval
지정한 통지 간격(interval)마다 0부터 시작하는 long 타입의 숫자를 데이터로 통지하는 Flowable/Observable을 생성하는 연산자이다.
interval 메서드는 별도 설정이 없으면 Schedulers.computation()의 스케줄러에 의해 실행된다. 만약 스케줄러를 변경하려면 인자로 스케줄러를 받는 메서드를 이용한다.
또한 interval 메서드로 생성한 Flowable/Observable은 완료 통지가 필요한 경우 take 메서드를 통해 통지할 데이터 개수를 제한하는 작업을 해야한다.
[소스코드]
public class Main {
public static void main(String[] args) throws Exception {
interval();
}
static void interval() throws Exception {
Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS);
flowable.subscribe(new PrintSubscriber<>());
Thread.sleep(5000L);
}
}
[Subscribe 코드]
import io.reactivex.subscribers.DisposableSubscriber;
public class PrintSubscriber<T> extends DisposableSubscriber<T> {
private String label;
public PrintSubscriber() {
}
public PrintSubscriber(String label) {
this.label = label;
}
@Override
public void onNext(T data) {
String threadName = Thread.currentThread().getName();
String resultMsg = label == null ? (threadName + " : " + data) : (threadName + " : "+ label + " : "+ data);
System.out.println(resultMsg);
}
@Override
public void onError(Throwable throwable) {
String threadName = Thread.currentThread().getName();
String resultMsg = label == null ? (threadName + " : Exception=" + throwable) : (threadName + " : "+ label + " : "+ throwable);
System.out.println(resultMsg);
}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
String resultMsg = label == null ? (threadName + " : Complete") : (threadName + " : "+ label + " : Complete");
System.out.println(resultMsg);
}
}
[결과]
RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4
반응형
'Language > Java' 카테고리의 다른 글
[RxJava] defer (0) | 2023.04.26 |
---|---|
[RxJava] timer (0) | 2023.04.26 |
[RxJava] range, rangeLong (0) | 2023.04.26 |
[RxJava] fromCallable (0) | 2023.04.26 |
[RxJava] fromArray, fromIterable (0) | 2023.04.26 |