前回の続きです。
Observable#using
を使うと、Observableのunsubscribe時にRealmインスタンスを同じスレッド上で閉じることができるよ、と前回の記事で書きました。
結論から言うと前回のコードだけでは足りませんでした。
大体の場合において同じスレッド上でcloseされるのですが、たまに別スレッドで閉じられてしまう、ということがわかりました。
Twitter上でうんうん言いながらRxJavaのコードを読みながら再現するコードを書こうとしてたら、@hydrakecatさんが捕捉してくださり、さくっと再現させてくれました。
やはり頑張ってRxJavaとRealmのコードを読み込むしかない
— せーい(yshrsmz) (@_yshrsmz) 2016年7月13日
@_yshrsmz うーん、再現したかも。とりあえず、このコードを何回も実行するとたまに main スレッドでクローズされますね。https://t.co/EHda8VQeEh
— Hiroshi Kurokawa (@hydrakecat) 2016年7月13日
最初は毎回再現するわけじゃないしバグなのかな? と思っていたのですが、どうやらObservable#subscribeOn
はunsubscribeするスレッドを関知しないようでした。
@_yshrsmz ううん、微妙なんですよね。subscribeOn() の JavaDoc には unsubscribe のスケジューラについて一言も書かれていませんし……。もうちょっとドキュメントとコードを当たってみます。
— Hiroshi Kurokawa (@hydrakecat) 2016年7月13日
@hydrakecat @_yshrsmz unSubscribeOnでスレッド明示すればいいって話ではない感じですか?
— ちばっちんぐ (@chibatching) 2016年7月13日
じゃあObservable#unsubscribeOn
指定したらいいじゃん? というわけにもいきません。Schedulers.io()
等のRxJavaが標準で用意しているスケジューラは、実行時にスレッドプールから適当なスレッドを渡すためです。
@_yshrsmz @chibatching あ、駄目だ。Shcedulers.io() とか使うとスレッドが一緒になる保証ないですね……。自分でシングルスレッドのスケジューラ作らないといけないかも。
— Hiroshi Kurokawa (@hydrakecat) 2016年7月13日
シングルスレッドのスケジューラだとRealm関連の操作が全部直列になってしまうしパフォーマンス的に良くないよな〜と思っていろいろ考えたのですが、妙案は思いつかず。
最後の手段、ということでRxJavaのGithubレポにissueを投げてみました。
Question about Observable.using's resourceFactory & disposeAction #4197 ReactiveX/RxJava
回答は以下のような感じでした。
Hi.
- Yes. The operator doesn't deal with scheduling.
- Not with subscribeOn; try unsubscribeOn but you need a single-threaded Scheduler as all the default ones will give you different threads most likely.
やはり、@hydrakecatさんと@chibatchingさんがおっしゃっていたようにObservable#unsbscribeOn
とシングルスレッドのスケジューラを合わせるのが正攻法のようです。
最終的な解決策は上述のissueにも書きましたが、複数のシングルスレッドスケジューラを順番に使い回す、という形になりました。
public class RealmSchedulerPool {
private final static String PREFIX = "RealmScheduler-";
private final static List<Scheduler> SCHEDULERS = new ArrayList<Scheduler>(){
{
add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory(PREFIX + "1-"))));
add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory(PREFIX + "2-"))));
add(Schedulers.from(Executors.newSingleThreadExecutor(new RxThreadFactory(PREFIX + "3-"))));
}
};
private final static AtomicLong COUNT = new AtomicLong(0L);
private RealmSchedulerPool() {
// no-op
}
public static Scheduler get() {
long current = COUNT.getAndIncrement();
return SCHEDULERS.get((int) (current % 3)); // 3 is the size of SCHEDULERS
}
}
public static <T> Observable.Transformer<T, T> doInRealmScheduler() {
return tObservable -> {
Scheduler s = RealmSchedulerPool.get();
return tObservable
.subscribeOn(s)
.unsubscribeOn(s);
};
}
これを前回のコードと合わせて下記のように使います。
asObservable()
.compose(doInRealmScheduler())
.map(realm -> realm.where(Foo.class).findAll());
助言をくださった@hydrakecatさん、@chibatchingさん、ありがとうございました。