Reactive Programing

Java API를 RxJava와 통합 하는 방법

태인킴 2020. 10. 31. 17:41
반응형


1. Observable.fromCallable()

Java의 Callable을 통합하기 위해서 Observable.fromCallable을 사용할수 있습니다.

Observable.fromCallable(() -> someOperation())
    .subscribeOn(Schedulers.io())

 

2. Observable.fromFuture()

Java의 Future을 통합하기 위해서 Observable.fromFuture을 사용할수 있습니다.

Observable.fromFuture(new FutureTask<>(() -> longRunningOperation()))

 

 

3. Emitter(이미터) 인터페이스 사용

Emitter(이미터) 인터페이스를 사용하여 아이템 방출을 수동적으로 제어 할수 있습니다.

.onNext() : Observable에 값을 제공 합니다.

.onError() : Observable에 에러 또는 예외를 알립니다.

.onComplete() : Observable에게 안전하게 종료할 수 있음을 알려 줍니다.

Observable.create(new ObservableOnSubscribe<Integer>() {
     @Override
     public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(100);
            emitter.onNext(200);
            emitter.onComplete();
     }
});

Observable.create(emitter -> {
     emitter.onNext(100);
     emitter.onNext(200);
     emitter.onComplete();
 });

여기서 중요한 것이 .onComplete() 호출을 놓치면 안됩니다. 따라서 다음과 같이 구현하는 것이 좋습니다.

 

Observable.create(emitter -> {
     try {
         emitter.onNext(someOperation());
     } catch (Exception ex) {
         emitter.onError(ex);
     } finally {
         emitter.onComplete();
     }
});

따라서, finally 구문에서 .onComplete()를 호출시켜 줍니다. 그리고 someOperation() 함수로 추상화 시켜주는 것이 좋습니다. try-catch-finally로 Emitter(이미터)의 flow 로직만 노출 시켜 주는것이 좋다고 생각 합니다.  

 

4. Emitter 메모리 누수

Observable.create(emitter -> {
    helloText.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View view) {
            emitter.onNext(view);
        }
    });
});

위 소스코드를 보면, View.onClickListener가 emitter에 대한 참조가 있고, Observable는 emitter(이미터)에 대한 참조가 있습니다. 따라서, View.onClickListener는 익명 객체 이기 때문에, View.onClickListener의 메모리가 제대로 해제가 안되면, emitter, Observable 역시도 메모리 해제가 되지 않습니다. 메모리 누수가 일어납니다.

 

Observable.create(emitter -> {
     emitter.setCancellable(() -> helloText.setOnClickListener(null));
     helloText.setOnClickListener(v -> emitter.onNext(v));
});

따라서, emitter.setCancellable() 을 이용하여 메모리를 해제 시켜 줄수 있습니다.

 

Observable.create(emitter -> {
    emitter.setDisposable(new MainThreadDisposable() {
        @Override
        protected void onDispose() {
            helloText.setOnClickListener(null);
        }
    });
    helloText.setOnClickListener(v -> emitter.onNext(v));
});

만약, RxAndroid를 사용한다면, 위와 같이 .setDisposable()을 이용해서 View.OnClickListener()를 null로 만들어 줄수도 있습니다. 또한 Emitter(이미터)는 위와 같이 Listener의 아이템을 방출 할때 제어하기 좋습니다.

반응형