Today I Learned的なことをやってみる
表題の通りなんだけど、Observable#intervalはunsubscribeしただけでは、onCompleteまで呼ばれない。
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(aLong -> {
print("interval onNext: %d", aLong);
}, throwable -> {
print("interval onError");
}, () -> {
print("interval onComplete");
});
Thread.sleep(3500)
subscription.unsubscribe();こんなコード(擬似コード)を書くと、
interval onNext: 0
interval onNext: 1
interval onNext: 2
...こんなログが出て、subscription.unsubscribe()が呼ばれたタイミングで、interval onCompleteが出力されることなくログの出力が終了する。
Observable#intervalはHot Observableで誰がsubscribeしてるとか関係ない感じなんだろうなーという認識。
どうやったらonComplete呼ばれるのん?っていうと下記スレッドでやり方が紹介されていた。
Call onComplete when an observable uses interval method.
Observable#takeUntilを使うらしい。引数として受け取ったObservableのonNextが呼ばれたら、それより後に来る元のObservableのonNextを全部無視するオペレータだ。
PublishSubject<Void> stop = PublishSubject.create();
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.takeUntil(stop)
.subscribe(aLong -> {
print("interval onNext: %d", aLong);
}, throwable -> {
print("interval onError");
}, () -> {
print("interval onComplete");
});
Thread.sleep(3500)
stop.onNext(null);こうするとonCompleteが呼ばれて終了する。
ただ、Observable#interval使った時だけPublishSubjectでunsubscribeとかダルいなーって思ってたらこんなことができた
PublishSubject<Void> stop = PublishSubject.create();
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(() -> {
print("interval onUnsubscribe");
stop.onNext(null);
})
.takeUntil(stop)
.subscribe(aLong -> {
print("interval onNext: %d", aLong);
}, throwable -> {
print("interval onError");
}, () -> {
print("interval onComplete");
});
Thread.sleep(3500)
subscription.unsubscribe();ログはこんな感じ
interval onNext: 0
interval onNext: 1
interval onNext: 2
...
interval onUnsubscribe
interval onCompletedoOnUnsubscribeはどうやらObserverにイベントが来なくなるより前に実行されるみたい。
これでonCompleteは呼ばれるようになったけど、Observable#takeUntilのドキュメントを見る限りだとObservable#intervaleのストリームは依然続いてそうな感じもするし、onCompleteでしたい処理がある場合以外はこんなめんどくさいことしなくていいかもしれない。