码仔漫画:一个Bug引发的RxJava血案
class MainActivity : AppCompatActivity() {
private val refreshProcessor = PublishProcessor.create()
override fun onCreate(savedInstanceState: Bundle?) {
refreshButton.setOnClickListener {
refreshProcessor.onNext(Unit)
}
refreshProcessor
.flatMapSingle(networkRequest::fetchData)
.subscribeOn(backgroundScheduler)
.observeOn(mainScheduler)
.subscribe(view::render)
}
}
不够的。您的Rx调用链将在调用onNext的线程上执行,而且在我们的例子中,这是主线程,因此onClickListeners是在主线程上得到通知。
最糟糕的是,在大多数情况下,您的应用程序不会像我们的Demo中那样崩溃,但是主逻辑将在主线程上执行,这可能会导致UI上的丢帧和糟糕的用户体验。
如何解决问题呢?
这个问题没有通用的解决办法。开发人员需要评估这些问题中的每一个,并找到针对特定问题的最佳解决方案。我们需要意识到这一点,但又不得不调试应用程序中的每一种可能的Rx调用链来找出有问题的那个。
我们希望我们的应用程序在发生类似的事情时“抱怨”一下。我的意思是:它能打断DEBUG构建并将日志记录到RELEASE构建中。
在简单思考如何实现后,可以得出结论:如果我们想切换到主线程,这意味着我们已经不想再进入主线程了。
这很容易用.compose()操作符实现,在切换到主线程之前,我们先用它检查当前线程是否是主线程。即使这样做可行,我们也不希望有任何额外的操作符开销。
class OnRescheduleNotifyMainScheduler : Scheduler() {
private val mainScheduler = AndroidSchedulers.mainThread()
override fun createWorker() = object : Worker() {
private val worker = mainScheduler.createWorker()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (Looper.myLooper() == Looper.getMainLooper()) {
logOrError()
}
return worker.schedule(run, delay, unit)
}
override fun dispose() = worker.dispose()
override fun isDisposed() = worker.isDisposed
}
}
publishProcessor
.filter(this::isValid)
.map(this::toViewModel)
.doOnNext(this::log)
.observeOn(mainScheduler)
.subscribe(view::render)
通过调试我们可以发现:当调用.subscribe()时,订阅过程的启动,将包括三个步骤:
订阅上游流
通知下游onsubscribed()
向上游索取item
就像下面这样:
我们注意到,当.observeOn()操作符请求数据时,它在提供的调度程序(即Demo中的onRescheduleNotifyMainscheduler)上调度一个任务,但从当前线程调度一个任务,该线程也是主线程,因为RxJava链中没有任何.subscribeOn(backgroundscheduler)。
We added the .subscribeOn() operator to the middle of our chain and started debugging again.
publishProcessor
.filter(this::isValid)
.map(this::toViewModel)
.subscribeOn(backgroundScheduler)
.doOnNext(this::log)
.observeOn(mainScheduler)
.subscribe(view::render)
subscribeOn 运算符只将订阅进程切换到所需的线程,但这并不意味着项目将在该线程上发出。我们已经说过订阅过程由三个步骤组成,.subscribeOn()操作符将这三个步骤切换到指定的线程。它的作用就像是链中的最后一环,当有人订阅它时,它会立即在下游调用 onSubscribed()。当下游向它请求数据时,它会订阅上游,并且会在提供的线程上调用subscribe()方法。
总结
近期文章:
今日问题:
大家有遇到什么古怪的RxJava问题吗?
专属升级社区:《这件事情,我终于想明白了》