Today I Learned的なことをやってみる
表題の通りなんだけど、Observable#interval
はunsubscribe
しただけでは、onComplete
まで呼ばれない。
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(aLong -> {
print("interval onNext: %d", aLong);
}, throwable -> {
print("interval onError");
}, () -> {
print("interval onComplete");
});
Thread.sleep(3500)
subscription.unsubscribe();
こんなコード(擬似コード)を書くと、
interval onNext: 0
interval onNext: 1
interval onNext: 2
...
こんなログが出て、subscription.unsubscribe()
が呼ばれたタイミングで、interval onComplete
が出力されることなくログの出力が終了する。
Observable#interval
はHot Observableで誰がsubscribe
してるとか関係ない感じなんだろうなーという認識。
どうやったらonComplete呼ばれるのん?っていうと下記スレッドでやり方が紹介されていた。
Call onComplete when an observable uses interval method.
Observable#takeUntil
を使うらしい。引数として受け取ったObservable
のonNext
が呼ばれたら、それより後に来る元のObservable
のonNext
を全部無視するオペレータだ。
PublishSubject<Void> stop = PublishSubject.create();
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.takeUntil(stop)
.subscribe(aLong -> {
print("interval onNext: %d", aLong);
}, throwable -> {
print("interval onError");
}, () -> {
print("interval onComplete");
});
Thread.sleep(3500)
stop.onNext(null);
こうするとonComplete
が呼ばれて終了する。
ただ、Observable#interval
使った時だけPublishSubject
でunsubscribe
とかダルいなーって思ってたらこんなことができた
PublishSubject<Void> stop = PublishSubject.create();
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(() -> {
print("interval onUnsubscribe");
stop.onNext(null);
})
.takeUntil(stop)
.subscribe(aLong -> {
print("interval onNext: %d", aLong);
}, throwable -> {
print("interval onError");
}, () -> {
print("interval onComplete");
});
Thread.sleep(3500)
subscription.unsubscribe();
ログはこんな感じ
interval onNext: 0
interval onNext: 1
interval onNext: 2
...
interval onUnsubscribe
interval onComplete
doOnUnsubscribe
はどうやらObserver
にイベントが来なくなるより前に実行されるみたい。
これでonComplete
は呼ばれるようになったけど、Observable#takeUntil
のドキュメントを見る限りだとObservable#intervale
のストリームは依然続いてそうな感じもするし、onComplete
でしたい処理がある場合以外はこんなめんどくさいことしなくていいかもしれない。