Flowable은 Observable과 사용 방식은 같습니다. 또한, Flowable과 Observable을 사이의 호환도 toObservable(), toFlowable() 함수를 지원합니다. Flowable은 RxJava 2.x 부터 새롭게 도입된 클래스 입니다.그러면 Observable과 Flowable의 차이점은 무엇 일까요?
1. subscribeOn()
subscribeOn() 함수는 Observable에서 구독자가 subscribe() 함수를 호출했을 때 데이터 흐름을 발행하는 스레드를 지정 합니다.
static void sleep(long millseconds) {
try{
Thread.sleep(millseconds);
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
static final class Item {
final int id;
Item(int id){
this.id = id;
System.out.println("Construct Item " + id);
}
}
Observable.range( 1, 500_000_000)
.map(Item::new)
.subscribe(item -> {
sleep(50);
System.out.println("Received Item : " + item.id);
});
출력 결과
업스트림에서 데이터의 발행과 다운스트림에서 데이터를 처리하는 속도의 차이가 없습니다. 바로 item을 배출하는 것을 확인 할수 있습니다.
2. observeOn()
observeOn() 함수는 처리된 결과를 구독자에게 전달하는 스레드를 지정 합니다. 따라서,
Observable.range( 1, 500_000_000)
.map(Item::new)
.observeOn(Schedulers.io())
.subscribe(item -> {
sleep(50);
System.out.println("Received Item : " + item.id);
});
sleep(Integer.MAX_VALUE);
출력 결과
업스트림에서 데이터의 발행과 다운스트림에서 데이터를 처리하는 속도의 차이가 큽니다. 이 경우, 메모리에 발행된 데이터가 엄청 쌓이고 있을 것 입니다. 결국 OOM(Out Of Memory)가 발생 할 것 입니다. 이것이 배압 입니다.
3. 뜨거운 Observable
PublishSubject<Integer> subject = PublishSubject.create();
subject.observeOn(Schedulers.computation())
.subscribe(data -> {
Thread.sleep(100);
Log.d("Flowable test", "" + data);
}, Throwable::printStackTrace);
for (int i = 1; i < 50_000_000; i++) {
subject.onNext(i);
}
subject.onComplete();
위와 같이, 뜨거운 Observable을 처리할 경우에 데이터의 발행과 데이터를 처리하는 속도의 차이가 큰 경우가 있습니다.
4. Observable에서 발행하는 데이터 속도가 1M/s, 네트워크 요청 속도 50/s 경우
Observable에서 발행하는 데이터 속도가 1M/s, 네트워크 요청 속도 50/s 경우, 데이터 발행 속도가 너무 빨라, 데이터는 금방 메모리에 쌓이게 되어, 메모리 누수가 발생 할 것 입니다.
5. 배압(BackPressure)
배압(BackPressure)은 업스트림에서 데이터 발행과, 다운스트림에서 데이터를 처리하는 속도의 차이가 큰 것을 배압 이라고 합니다.
6. Flowable을 통한 배압(BackPressure) 대응
배압(BackPressure) 이슈를 대응하기 위해서 Observable에서 먼저, sample(), debounce(), throttle() 함수를 통해서 데이터의 발행 속도를 제어 하는 방안을 먼저 찾는 것을 권합니다. 하지만, 그렇지 못할 경우를 위해서 Flowable 클래스를 제공 합니다. Flowable 클래스는 아래와 같은 함수를 제공 하므로써, 배압 이슈를 대응 합니다.
.onBackpressureBuffer() - 배압 이슈가 발생했을 때 별도의 버퍼에 저장 합니다. 기본적으로 128개의 버퍼가 있습니다.
.onBackpressureDrop() - 처리할 수 없어서 쌓이는 데이터를 무시합니다.
.onBackpressureLatest() - 처리할 수 없어서 쌓이는 데이터를 무시하면서, 최신 데이터만 유지 합니다.
Flowable.range(1, 50_000_000)
.onBackpressureBuffer()
//.onBackpressureDrop()
//.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(data -> {
Thread.sleep(100);
Log.d("Flowable test", "" + data);
}, Throwable::printStackTrace);
'Reactive Programing' 카테고리의 다른 글
FlatMap 파헤치기 (0) | 2021.02.02 |
---|---|
Cold Observable vs Hot Observable (PublishSubject 클래스) (0) | 2020.11.22 |
리액티브 프로그래밍 (Reactive Programming) RxJava (0) | 2020.11.01 |
Java API를 RxJava와 통합 하는 방법 (0) | 2020.10.31 |
RxJava의 Single, Maybe 클래스 (0) | 2020.10.15 |