Android和iOS开发中的异步处理(四)——异步任务和队列
导读:本文与前面三篇没有特别强烈的依赖关系,可以直接阅读本文。
在本篇文章中,我们主要讨论在客户端编程中经常使用的队列结构,它的异步编程方式以及相关的接口设计问题。
前几天,有位同事跑过来一起讨论一个技术问题。情况是这样的,他最近在开发一款手游,用户在客户端上的每次操作都需要向服务器同步数据。本来按照传统的网络请求处理方式,用户发起操作后,需要等待操作完成,这时界面要显示一个请求等待的过程(比如转菊花)。当请求完成了,客户端显示层才更新,用户也才能发起下一个操作。但是,这个游戏要求用户能在短时间内连续做很多操作。如果每个操作都要经历一个请求等待的过程,无疑体验是很糟糕的。
https://github.com/tielei/AsyncProgrammingDemos
概述
发送聊天消息。现在一般的聊天软件都允许用户连续输入多条聊天消息,也就是说,用户不用等待前一条消息发送成功了,再键入第二条消息。系统会保证用户的消息有序,而且由于网络状况不好而发送失败的消息会经历若干次重试,从而保证消息尽力送达。这其实背后有一个消息发送队列,它对消息进行排队处理,并且在错误发生时进行有限的重试。
一次上传多张照片。如果用户能够一次性选中多张照片进行上传操作,这个上传过程时间会比较长,一般需要一个或多个队列。队列的重试功能还能够允许文件的断点续传(当然这要求服务端要有相应的支持)。
将关键的高频操作异步化,提升体验。比如前面提到的那个游戏连续操作的例子,再比如在微信朋友圈发照片或者评论别人,都不需要等待本次网络请求结束,就可以进行后续操作。这背后也隐藏着一个队列机制。
介绍传统的线程安全队列TSQ(Thread-Safe Queue)。
适合客户端编程环境的无锁队列。这一部分遵循异步任务的经典回调方式(Callback)来设计接口。关于异步任务的回调相关的详细讨论,请参见这个系列的第二篇。
基于RxJava响应式编程的思想实现的队列。在这一部分,我们会看到RxJava对于异步任务的接口设计会产生怎样的影响。
Thread-Safe Queue
它需要额外启动一个单独的线程作为消费者。
更适合客户端环境的“主线程->异步线程->主线程”的编程模式(参见这个系列的第一篇中Run Loop那一章节的相关描述),使得生产者和消费者可以都运行在主线程中,这样就不需要一个Thread-Safe的队列,而是只需要一个普通队列就行了(下一章要讲到)。
基于Callback的任务队列
/**
* 唯一标识当前任务的ID
* @return
*/
String getTaskId();
/**
* 由于任务是异步任务,
* 那么start方法被调用只是启动任务;
* 任务完成后会回调TaskListener.
*
* 注: start方法需在主线程上执行.
*/
void start();
/**
* 设置回调监听.
* @param listener
*/
void setListener(TaskListener listener);
/**
* 异步任务回调接口.
*/
interface TaskListener {
/**
* 当前任务完成的回调.
* @param task
*/
void taskComplete(Task task);
/**
* 当前任务执行失败的回调.
* @param task
* @param cause 失败原因
*/
void taskFailed(Task task, Throwable cause);
}
}
/**
* 向队列中添加一个任务.
* @param task
*/
void addTask(Task task);
/**
* 设置监听器.
* @param listener
*/
void setListener(TaskQueueListener listener);
/**
* 销毁队列.
* 注: 队列在最后不用的时候, 应该主动销毁它.
*/
void destroy();
/**
* 任务队列对外监听接口.
*/
interface TaskQueueListener {
/**
* 任务完成的回调.
* @param task
*/
void taskComplete(Task task);
/**
* 任务最终失败的回调.
* @param task
* @param cause 失败原因
*/
void taskFailed(Task task, Throwable cause);
}
}
private static final String TAG = "TaskQueue";
/**
* Task排队的队列.
* 不需要thread-safe
*/
private Queue<Task> taskQueue = new LinkedList<Task>();
private TaskQueueListener listener;
private boolean stopped;
/**
* 一个任务最多重试次数.
* 若重试次数超过MAX_RETRIES,
* 任务则最终失败.
*/
private static final int MAX_RETRIES = 3;
/**
* 当前任务的执行次数记录
* (当尝试超过MAX_RETRIES时就最终失败)
*/
private int runCount;
@Override
public void addTask(Task task) {
//新任务加入队列
taskQueue.offer(task);
task.setListener(this);
if (taskQueue.size() == 1 && !stopped) {
//当前是第一个排队任务, 立即执行它
launchNextTask();
}
}
@Override
public void setListener(TaskQueueListener listener) {
this.listener = listener;
}
@Override
public void destroy() {
stopped = true;
}
private void launchNextTask() {
//取当前队列头的任务, 但不出队列
Task task = taskQueue.peek();
if (task == null) {
//impossible case
Log.e(TAG, "impossible: NO task in queue, unexpected!");
return;
}
Log.d(TAG, "start task (" + task.getTaskId() + ")");
task.start();
runCount = 1;
}
@Override
public void taskComplete(Task task) {
Log.d(TAG, "task (" + task.getTaskId() + ") complete");
finishTask(task, null);
}
@Override
public void taskFailed(Task task, Throwable error) {
if (runCount < MAX_RETRIES && !stopped) {
//可以继续尝试
Log.d(TAG, "task (" + task.getTaskId() + ") failed, try again. runCount: " + runCount);
task.start();
runCount++;
}
else {
//最终失败
Log.d(TAG, "task (" + task.getTaskId() + ") failed, final failed! runCount: " + runCount);
finishTask(task, error);
}
}
/**
* 一个任务最终结束(成功或最终失败)后的处理
* @param task
* @param error
*/
private void finishTask(Task task, Throwable error) {
//回调
if (listener != null && !stopped) {
try {
if (error == null) {
listener.taskComplete(task);
}
else {
listener.taskFailed(task, error);
}
}
catch (Throwable e) {
Log.e(TAG, "", e);
}
}
task.setListener(null);
//出队列
taskQueue.poll();
//启动队列下一个任务
if (taskQueue.size() > 0 && !stopped) {
launchNextTask();
}
}
}
进出队列的所有操作(
offer
,peek
,take
)都运行在主线程,所以队列数据结构不再需要线程安全。我们选择了LinkedList的实现。任务的启动执行,依赖两个机会:
任务进队列
addTask
的时候,如果原来队列为空(当前任务是第一个任务),那么启动它;一个任务执行完成(成功了,或者最终失败了)后,如果队列里有排队的其它任务,那么取下一个任务启动执行。
任务一次执行失败,并不算失败,还要经过若干次重试。如果重试次数超过
MAX_RETRIES
,才算最终失败。runCount
记录了当前任务的累计执行次数。
基于RxJava的任务队列
/**
* 向队列中添加一个任务.
*
* @param task
* @param <R> 异步任务执行完要返回的数据类型.
* @return 一个Observable. 调用者通过这个Observable获取异步任务执行结果.
*/
<R> Observable<R> addTask(Task<R> task);
/**
* 销毁队列.
* 注: 队列在最后不用的时候, 应该主动销毁它.
*/
void destroy();
}
原来的回调接口
TaskQueueListener
没有了。异步接口
addTask
原来没有返回值,现在返回了一个Observable。调用者拿到这个Observable,然后去订阅它(subscribe),就能获得任务执行结果(成功或失败)。这里的改动很关键。本来addTask
什么也不返回,要想获得结果必须监听一个回调接口,这是典型的异步任务的运作方式。但这里返回一个Observable之后,让它感觉上非常类似一个同步接口了。说得再抽象一点,这个Observable是我们站在当下对于未来的一个指代,本来还没有运行的、发生在未来的虚无缥缈的任务,这时候有一个实实在在的东西被我们抓在手里了。而且我们还能对它在当下就进行很多操作,并可以和其它Observable结合。这是这一思想真正的强大之处。
* 异步任务接口定义.
*
* 不再使用TaskListener传递回调,
* 而是使用Observable.
*
* @param <R> 异步任务执行完要返回的数据类型.
*/
public interface Task <R> {
/**
* 唯一标识当前任务的ID
* @return
*/
String getTaskId();
/**
*
* 启动任务.
*
* 注: start方法需在主线程上执行.
*
* @return 一个Observable. 调用者通过这个Observable获取异步任务执行结果.
*/
Observable<R> start();
}
总结
再说一下TSQ
本文的任务队列设计中所忽略的
本文只设计了任务的成功和失败回调,没有执行进度回调。
本文没有涉及到任务取消和暂停的问题(我们下一篇文章会涉及这个话题)。
任务队列的一些细节参数应该是可以由使用者设置的,比如最大重试次数。
长生命周期的队列和短生命周期的页面之间的交互,本文没有考虑。在GitHub实现的演示代码中,为了简单起见,演示页面关闭后,任务队列也销毁了。但实际中不应该是这样的。关于“长短生命周期的交互”,我后来发现也是一个比较重要的问题,也许后面我们有机会再讨论。
在Android中,类似任务队列这种可能长时间后台运行的组件,一般外层会使用Service进行封装。
任务队列对于失败重试的处理,要求服务器慎重地对待去重问题。
监听到任务队列失败发生之后,错误处理变得复杂。
RxJava的优缺点
RxJava是个比较重的框架,它非常抽象,难以理解。它对于接口的调用者简单,而对于接口的实现者来说,是个难题。在实现一个异步接口的时候,如何返回一个恰当的Observable实例,有时候并不是那么显而易见。
Observable依赖subscribe去驱动它的上游开始运行。也就是说,你如果只是添加一个任务,但不去观察它,它就不会执行!如果你只是想运行一个任务,但并不关心结果,那么,这办不到。举个不恰当的例子,这有点像量子力学,观察对结果造成影响......
受前一点影响,在本文给出的GitHub代码的实现中,第一个任务的真正启动运行,并不是在
addTask
中,而是有所延迟,延迟到调用者的subscribe开始执行后。而且其执行线程环境有可能受到调用者对于Schedulers的设置的影响(比如通过subscribeOn),有不在主线程执行的风险。RxJava在调试时会出现奇怪的、让人难以理解的调用栈。
在本文结束之前,我再提出一个有趣的开放性问题。本文GitHub上给出的代码大量使用了匿名类(相当于Java 8的lambda表达式),这会导致对象之间的引用关系变得复杂。那么,对于这些对象的引用关系的分析,会是一个很有趣的话题。比如,这些引用关系开始是如何随着程序执行建立起来的,最终销毁的时候又是如何解除的?有没有内存泄露呢?欢迎留言讨论。
其它精选文章:
技术的正宗与野路子
程序员的那些反模式
编程世界的熵增原理
程序员的宇宙时间线
Android端外推送到底有多烦?
Android和iOS开发中的异步处理(一)——开篇
用树型模型管理App数字和红点提示
Redis内部数据结构详解(5)——quicklist
宇宙尽头的描述符(下)
Redis内部数据结构详解(1)——dict
扫码或长按关注微信公众号:张铁蕾。
有时候写点技术干货,有时候写点有趣的文章。
这个公众号有点科幻。