비동기 처리
비동기 처리는 어던 작업을 실행하는 동안 해당 처리가 끝나기를 기다리지 않고 다른 작업을 수행할 수 있는 것을 말한다.
RxJava는 비동기 처리를 수행하는데 필요한 API를 제공하고 기존에 구축한 비지니스 로직에 영향을 주지 않고 생산자 또는 소비자의 작업을 비동기로 처리하도록 교체할 수 있다. 또한, 용도별로 적절하게 스레드를 관리하는 클래스를 제공하여 직접 스레드를 관리하는 번거로움이 줄었다.
RxJava 비동기 처리
RxJava에서 개발자가 직접 비동기 처리를 하도록 설정하거나 연산자 내에서 시간을 다루는 작업을 하지 않는 한, 생산자의 처리 작업을 실행하는 스레드에서 각 연산자의 처리 작업과 소비자의 처리 작업이 실행된다. 개발자가 직접 비동기 처리를 하도록 설정하면 생산자와 연산자, 소비자가 처리 작업을 실행할 스레드를 분리할 수 있다.
예를들어, 1초마다 데이터를 통지하는 Flowable을 interval 메서드로 실행하더라도 통지된 데이터는 받은 측의 처리 작업이 1초보다 길어지면 Flowable은 1초마다 데이터를 통지할 수 없게 된다.
아래 코드는 생산자가 1초마다 데이터를 통지해야 하는데, 처리하는 측의 속도에 영향을 받아 2초마다 데이터를 통지한다. 이처럼 같은 스레드에서 처리 작업을 수행할 때는 데이터를 보내고 받는 측의 양향을 고려해야 한다. 그래서 개발자는 create 메서드 등으로 통지 처리를 구현할 때 데이터를 받는 측의 처리속도에 영향을 받지 않도록 구현해야 한다.
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
public class SyncSlower {
public static void main(String[] args) throws Exception {
Flowable.interval(1000L, TimeUnit.MICROSECONDS)
.doOnNext(data-> System.out.println("Emit : "+ System.currentTimeMillis() + " ms : "+ data)) //통지할때 데이터 시간 출력
.subscribe(data -> Thread.sleep(2000L)); //구독, 무거운 처리 작업수행을 가정
Thread.sleep(5000L);
}
}
스케줄
스케줄러는 스레드를 관리하는 클래스로 RxJava에서 표준 API를 사용하지 않고 비동기 처리를 할 수 있다. Schedulers클래스의 메서드를 호출 하여 사용한다.
메서드 | 내용 |
---|---|
computation | 연산 처리를 할 때 사용하는 스케줄러로, 논리프로세서 수와 같은 수만큼 스레드를 캐시한다. I/O 처리 작업에서는 사용할 수 없다. |
io | I/O처리 작업을 할 때 사용하는 스케줄러로 스레드 풀에서 스레드를 가져오고, 필요에 따라 새로운 스레드를 생성한다 |
single | 싱글 스레드에서 처리 작업을 할 때 사용하는 스케줄러다 |
newThread | 매번 새로운 스레드를 생성하는 스케줄러다 |
from(Executor executor) | 지정한 Executor가 생성한 스레드에서 처리 작업을 수행하는 스케줄러다 |
trampoline | 현재 Thread Queue에 처리작업을 넣는 스케줄러로, 이미 다른 처리 작업이 Queue에 들어가 있다면 Queue에 들어 있는 작업의 처리가 끝난 후 새로 등록한 처리 작업을 수행한다. |
computation과 io 메서드로 얻은 스케줄러는 비슷한 역할을 하고, 호출할 때 Thread Pool에서 서로다른 Thread를 가져온다.
io
- 스레드 풀에 더 이상가져올 스래드가 없을 때 스레드를 생성하고 I/O 처리 작업 중 대기시간이 발생할 가능성이 있어 논리 프로세서 수를 초과해도 스레드를 생성해 동시 처리작업을 한다.
- 서로 다른 스레드가 동시에 접근하는 공유 I/O처리 작업에 적절하지 않다. 그래서 Thread Safe를 보장하게 구현하거나 single 메서드로 가져온 스케줄러가 제공하는 공통 스레드에서만 처리 작업을 해야한다.
computation
- 논리 프로세서 수가 넘지 않는 범위 내에서 스레드를 주고 받는다.
- 연산 처리 작업에 대기하는 일이 없기에 논리 프로세서 수를 초과하는 스레드로 처리작업을 하게 되면 실행 스레드를 전환하는 일이 발생하게 되어 스레드 전환 비용에 의해 성능이 저하될 수 있다.
RxJava내에서 스케줄러를 별도로 설정하지 않고, 연산자, 시간을 다루는 처리작업을 하지 않는 이상 생산자는 기본 스레드에서 모든 처리 작업을 수행한다.
subscribeOn 메서드
subscribeOn 메서드는 생산자의 처리 작업을 어떤 스케줄러에서 실행할지 설정하는 메서드이다. (생산자는 Flowable/Observable이다)
subscribeOn 메서드는 생산자가 처리작업을 수행할 스케줄러를 설정 할 때 사용하므로 최초 1회만 설정할 수 있다.
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class SubscribeOnMain {
public static void main(String[] args) throws Exception {
Flowable.just(1,2,3,4,5)
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.single())
.subscribe(data-> {
System.out.println(Thread.currentThread().getName() + " : "+ data);
});
Thread.sleep(500);
}
}
[결과]
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 5
스케줄러는 한 번만 설정할 수 있으므로 interval 메서드로 생성한 생산자가 스케줄러를 자동으로 지정할 때 SubscriveOn 메서드로 다른 스케줄러를 지정해도 반영되지 않는다.
observeOn 메서드
observeOn 메서드는 데이터를 받는 측의 처리 작업을 어떤 스케줄러에서 실행할지 설정하는 메서드로, observeOn 메서드는 데이터를 받은 측의 스케줄러를 지정함으로 연산자마다 서로 다른 스케줄러를 지정할 수 있다.
[observeOn 메서드 argument]
observeOn(Scheduler scheduler)
observeOn(Scheduler scheduler, Boolean delayError)
observeOn(Scheduler scheduler, Boolean delayError, int bufferSize)
type | 설명 |
---|---|
Scheduler | 스레드를 관리하는 스케줄러 클래스 |
Boolean | true 일 때 에러가 발생해도 즉시 통지하지 않고, 버퍼에 담긴 데이터를 모두 통지한 후에 에러를 통지, false일 때 에러가 밸상하면 바로통지한다. (default : false) |
int | 통지를 기다리는 데이터를 버퍼에 담는 크기로 기본값은 128이다. |
[observeOn 메서드 사용 예]
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import java.util.concurrent.TimeUnit;
public class ObserveOnMain {
public static void main(String[] args) throws Exception {
Flowable<Long> flowable = Flowable
.interval(300L, TimeUnit.MILLISECONDS) //0.3초마다 0부터 시작하는 데이터를 통지하는 lowable 생성
.onBackpressureDrop();
flowable.observeOn(Schedulers.computation(), false , 1) //비동기로 받게 하고, 버퍼 크기를 1로 설정
.subscribe(new ResourceSubscriber<Long>() { //구독
@Override
public void onNext(Long data) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(1);
}
System.out.println(Thread.currentThread().getName() + " : " + data);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(7000L);
}
}
[결과]
RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 8
RxComputationThreadPool-1 : 12
RxComputationThreadPool-1 : 16
연산자 내에서 생성되는 비동기 Flowable/Observable
flatMap 메서드
flatMap 메서드는 데이터를 받으면 새로운 Flowable/Observable을 생성하고 이를 실행하면 통지되는 데이터를 메서드의 결과물로 통지하는 연산자다.
flatMap 메서드는 처리 성능이 중요할 때 사용한다.
[flatMap 사용 예]
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
public class FlowMap {
public static void main(String[] args) throws Exception {
Flowable<String> flowable = Flowable.just("A", "B", "C")
.flatMap(data -> {
return Flowable.just(data).delay(1000L, TimeUnit.MILLISECONDS);
});
flowable.subscribe(data -> {
System.out.println(Thread.currentThread().getName() + " : " + data);
});
Thread.sleep(2000L);
}
}
[결과]
RxComputationThreadPool-3 : C
RxComputationThreadPool-2 : A
RxComputationThreadPool-2 : B
결과를 보면 순서가 다르게 나오는데 각스레드에서 받은 데이터를 순서대로 통지되므로 원본 데이터의 통지 순서는 달라질 수 있다.
concatMap 메서드
concatMap 메서드는 받은 데이터로 메서드 내부에 Flowable/Observable을 생성하고 이 Flowable/Observable을 하나 씩 순서대로 실행해 통지된 데이터를 그 결과물로 통지하는 연산자이다. 이 과정에서 생성 되는 Flowable/Observable은 다른 스레드에서 처리를 해도 영향을 받지 않고 새로생성한 Flowable/Observable의 처리 데이터를 받은 순서대로 통지한다.
성능에 관계없이 데이터 순서가 중요할 때는 concatMap을 사용한다.
[concatMap 메서드 코드]
import io.reactivex.Flowable;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
public class ConcatMap {
public static void main(String[] args) throws Exception {
Flowable<String> flowable = Flowable.just("A", "B", "C")
.concatMap(data -> {
return Flowable.just(data).delay(1000L, TimeUnit.MILLISECONDS);
});
flowable.subscribe(data -> {
String threadName = Thread.currentThread().getName();
String tiem = LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"));
System.out.println(threadName + " : data=" + data + ", time=" + tiem);
});
Thread.sleep(4000L);
}
}
[결과]
RxComputationThreadPool-1 : data=A, time=32.439
RxComputationThreadPool-2 : data=B, time=33.448
RxComputationThreadPool-3 : data=C, time=34.450
concatMapEager 메서드
concatMapEager 메서드는 데이터를 받으면 새로운 Flowable/Observable을 생성하고 이를 즉시 실행하고 그 결과를 받은 데이터를 원본 데이터 순서대로 통지하는 연산자이다. 실행은 flatMap 처럼 동시에 실행하여 결과를 통지할 때는 concatMap 과같이 통지한다.
데이터의 순서와 성능 모두가 중요하다면 concatMapEager 메서드를 사용하는 것이 적합하다. 하지만 통지 전 까지 데이터를 버퍼에 쌓아둬야 하므로 대량의 데이터를
전송할 때 메모리가 부족해질 위험이 있다.
[concatMapEager 코드]
import io.reactivex.Flowable;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
public class ConcatMapEager {
public static void main(String[] args) throws Exception {
Flowable<String> flowable = Flowable.just("A", "B", "C")
.concatMapEager(data -> {
return Flowable.just(data).delay(1000L, TimeUnit.MILLISECONDS);
});
flowable.subscribe(data -> {
String threadName = Thread.currentThread().getName();
String tiem = LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"));
System.out.println(threadName + " : data=" + data + ", time=" + tiem);
});
Thread.sleep(2000L);
}
}
[결과]
RxComputationThreadPool-1 : data=A, time=13.345
RxComputationThreadPool-1 : data=B, time=13.352
RxComputationThreadPool-1 : data=C, time=13.352
다른 스레드 간 공유되는 객체
RxJava는 외부 공유 객체간에 데이터를 다룰 때 순차성을 잃게 될 수 있다. 그래서 RxJava는 여러 개의 Flowable/Observable을 결합하고 새로운 Flowable을 생성하는 merge 메서드를 제공한다. merge를 사용하면 다른 스레드에 있는 여러개의 Flowable/Observable이라도 해도 순차적으로 데이터를 받을 수 있다.
공유객체가 있을 때는 merge 메소드를 이용한다.
merge 메소드를 사용하지 않았을 때
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class MergeMain {
public static void main(String[] args) throws Exception {
final Cnts cnt = new Cnts();
Flowable<Integer> source1 = Flowable.range(1, 10000) //10000번 호출
.subscribeOn(Schedulers.computation()) //Flowable이 다른 스레드에서 처리작업을 한다.
.observeOn(Schedulers.computation()); //다른 스레드에서 처리작업을 함
Flowable<Integer> source2 = Flowable.range(1, 10000)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation());
Flowable.merge(source1, source2)
.subscribe(
data -> {
cnt.increment();
},
error -> System.out.println("에러=>" + error),
() -> System.out.println("counter.get()=" + cnt.get()));
Thread.sleep(1000L);
}
}
class Cnts {
private volatile int count;
void increment() {
count++;
}
int get() {
return count;
}
}
[결과]
counter.get()=18923
counter.get()=19867
merge 메소드를 사용하여 처리했을 때
[코드]
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class MergeMain {
public static void main(String[] args) throws Exception {
final Cnt cnt = new Cnt();
Flowable<Integer> source1 = Flowable.range(1, 10000) //10000번 호출
.subscribeOn(Schedulers.computation()) //Flowable이 다른 스레드에서 처리작업을 한다.
.observeOn(Schedulers.computation()); //다른 스레드에서 처리작업을 함
Flowable<Integer> source2 = Flowable.range(1, 10000)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation());
Flowable.merge(source1, source2)
.subscribe(
data -> {
cnt.increment();
},
error -> System.out.println("에러=>" + error),
() -> System.out.println("counter.get()=" + cnt.get()));
Thread.sleep(1000L);
}
}
class Cnt {
private volatile int count;
void increment() {
count++;
}
int get() {
return count;
}
}
[결과]
counter.get()=20000
참조
'Language > Java' 카테고리의 다른 글
[Reactive Programming] RxJava 리소스 관리 (0) | 2023.04.26 |
---|---|
[Reactive Programming] RxJava 예외 처리 (0) | 2023.04.26 |
[Reactive Programming] Marble Diagram(마블 다이어그램) (0) | 2023.04.26 |
[Reactive Programming] Reactive Streams (0) | 2023.04.26 |
[Reactive Programming] RxJava 개념 (0) | 2023.04.26 |