其他
全方位了解8.0系统下的Handler
https://www.jianshu.com/u/9cf1f31e1d09
Handler如何保证运行在目标线程 Handler容易造成内存泄漏的原因 loop()为什么不会阻塞,CPU为什么不会忙等 MessageQueue如何存储 Message如何缓存 什么是线程空闲消息 线程如何使用Handler机制
private static Message sPool;
// 缓存池当前容量
private static int sPoolSize = 0;
// 下一节点
Message next;
public static Message obtain() {
// 确保同步
synchronized (sPoolSync) {
if (sPool != null) {
// 缓存池不为空
Message m = sPool;
// 缓存池指向下一个Message节点
sPool = m.next;
// 从缓存池拿到的Message对象与缓存断开连接
m.next = null;
m.flags = 0; // clear in-use flag
// 缓存池大小减一
sPoolSize--;
return m;
}
}
// 缓存吃没有可用对象,返回新的Message()
return new Message();
}
将Message对象发送到 发送Runnable,通过getPostMessage()将Runnable包装在Message里,表现为成员变量callback
// 获取Message
Message m = Message.obtain();
// 记住Runnale,等消息获得执行时回调
m.callback = r;
return m;
}
// Message.target 记住 Handler 以明确是由哪一个Handler来处理这个消息的
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
// 消息入队
return queue.enqueueMessage(msg, uptimeMillis);
}
// 拿到Looper
final Looper me = myLooper();
if (me == null) {
// 没调用prepare初始化Looper,报错
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 拿到消息队列
final MessageQueue queue = me.mQueue;
......
for (;;) {
// 从消息队列取出下一个信息
Message msg = queue.next();
if (msg == null) {
// 消息为空,返回
return;
}
.......
try {
// 分发消息到Handler
msg.target.dispatchMessage(msg);
end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
}
// 消息回收,放入缓存池
msg.recycleUnchecked();
}
if (msg.callback != null) {
// 这个情况说明了次消息为Runnable,触发Runnable.run()
handleCallback(msg);
} else {
if (mCallback != null) {
// 指定了Handler的mCallback
if (mCallback.handleMessage(msg)) {
return;
}
}
// 普通消息处理
handleMessage(msg);
}
}
可以通过Handler发送Runnable消息到消息队列,因此handleCallback()处理这种情况 可以给Handler设置Callback,当分配消息给Handler时,Callback可以优先处理此消息,如果Callback.handleMessage()返回了true,不再执行Handler.handleMessage() Handler.handleMessage()处理具体逻辑
// 这里是将Message各种属性重置操作
......
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
// 缓存池还能装下,回收到缓存池
// 下面操作将此Message加入到缓存池头部
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
Handler 从缓存池获取Message,发送到MessageQueue Looper不断从MessageQueue读取消息,通过Message.target.dispatchMessage()触发Handler处理逻辑 回收Message到缓存池
// 保证Looper在线程唯一
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
// 将Looper放入ThreadLocal
sThreadLocal.set(new Looper(quitAllowed));
}
// 初始化MessageQueue
mQueue = new MessageQueue(quitAllowed);
// 记住当前线程
mThread = Thread.currentThread();
}
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
// 与Native建立连接
mPtr = nativeInit();
}
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
// 返回给Java层的mPtr, NativeMessageQueue地址值
return reinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
// 检查Looper 是否创建
if (mLooper == NULL) {
mLooper = new Looper(false);
// 确保Looper唯一
Looper::setForThread(mLooper);
}
}
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
// 添加到epoll的文件描述符,线程唤醒事件的fd
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
.....
// Allocate the new epoll instance and register the wake pipe.
// 创建epolle实例,并注册wake管道
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
// 清空,把未使用的数据区域进行置0操作
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
// 监听可读事件
eventItem.events = EPOLLIN;
// 设置作为唤醒评判的fd
eventItem.data.fd = mWakeEventFd;
// 将唤醒事件(mWakeEventFd)添加到epoll实例,意为放置一个唤醒机制
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
// 添加各种事件的fd到epoll实例,如键盘、传感器输入等
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
/ 发送数据的具体过程 /
......
synchronized (this) {
......
// 记录消息处理的时间
msg.when = when;
Message p = mMessages;
// 唤醒线程的标志位
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 这里三种情况:
// 1、目标消息队列是空队列
// 2、插入的消息处理时间等于0
// 3、插入的消息处理时间小于保存在消息队列头的消息处理时间
// 这三种情况都插入列表头
msg.next = p;
mMessages = msg;
// mBlocked 表示当前线程是否睡眠
needWake = mBlocked;
} else {
// 这里则说明消息处理时间大于消息列表头的处理时间,因此需要找到合适的插入位置
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 这里的循环是找到消息的插入位置
for (;;) {
prev = p;
p = p.next;
// 到链表尾,或处理时间早于p的时间
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
// 如果插入的消息在目标队列中间,是不需要检查改变线程唤醒状态的
needWake = false;
}
}
// 插入到消息队列
msg.next = p;
prev.next = msg;
}
if (needWake) {
// 唤醒线程
nativeWake(mPtr);
}
}
return true;
}
目标消息队列是空队列 插入的消息处理时间等于0 插入的消息处理时间小于保存在消息队列头的消息处理时间 插入的消息处理时间大于消息队列头的消息处理时间
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// 向管道写入一个新数据,这样管道因为发生了IO事件被唤醒
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
mWakeEventFd, strerror(errno));
}
}
}
final long ptr = mPtr;
if (ptr == 0) {
// 获取NativeMessageQueue地址失败,无法正常使用epoll机制
return null;
}
// 用来保存注册到消息队列中的空闲消息处理器(IdleHandler)的个数
int pendingIdleHandlerCount = -1;
// 如果这个变量等于0,表示即便消息队列中没有新的消息需要处理,当前
// 线程也不要进入睡眠等待状态。如果值等于-1,那么就表示当消息队列中没有新的消息
// 需要处理时,当前线程需要无限地处于休眠等待状态,直到它被其它线程唤醒为止
int nextPollTimeoutMillis = 0;
for (;;) {
......
// 检查当前线程的消息队列中是否有新的消息需要处理,尝试进入休眠
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// 当前时间
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
// mMessages 表示当前线程需要处理的消息
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 找到有效的Message
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
/**
* 检查当前时间和消息要被处理的时间,如果小于当前时间,说明马上要进行处理
*/
if (now < msg.when) {
// 还没达到下一个消息需要被处理的时间,计算需要休眠的时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 有消息需要处理
// 不要进入休眠
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
// 指向下一个需要处理的消息
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// 没有更多消息,休眠时间无限
nextPollTimeoutMillis = -1;
}
......
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
// 获取IdleHandler数
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// 没有IdleHandler需要处理,可直接进入休眠
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// 如果没有更多要进行处理的消息,在休眠之前,发送线程空闲消息给已注册到消息队列中的IdleHandler对象来处理
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
// 处理对应逻辑,并由自己决定是否保持激活状态
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
// 不需要存活,移除
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// 重置IdleHandler数量
pendingIdleHandlerCount = 0;
/**
* 这里置0,表示下一次循环不能马上进入休眠状态,因为IdleHandler在处理事件的时间里,
* 有可能有新的消息发送来过来,需要重新检查。
*/
nextPollTimeoutMillis = 0;
}
}
......
// 这个是用来监听实例化时创建的epoll实例的文件描述符的IO读写事件
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 如果没有事件,进入休眠,timeoutMillis为休眠事件
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
......
/**
* 检测是哪一个文件描述符发生了IO读写事件
*/
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
// 如果文件描述符为mWakeEventFd,并且读写事件类型为EPOLLIN,说明
// 当前线程所关联的一个管道被写入了一个新的数据
// 唤醒
awoken();
}
}
......
}
}
Looper通过prepare()创建,借助ThreadLocal保证线程唯一,如果没有进行prepare(),调用Loop()会抛出异常 Looper在实例化时创建MessageQueue,MessageQueue与NativeMessageQueue建立连接,NativeMessageQueue存储地址存于MessageQueue.mPtr。Native端也建立了Handler机制,使用epoll机制。Java端借由NativeMessageQueue能达到使用epoll机制的目的 从Message缓存里获取Message,缓存为链表存储,从头出取出,并且Message在回收时也是插入头部。如果存缓存里取不到,则新建 Handler向MessageQueue插入消息,如果消息插入消息队列头部,需要唤醒线程;如果插入消息队列中,无需改变线程状态 Looper.loop() 不断从消息队列获取消息,消息队列获取消息时会出现两种情况。如果取到消息,但没达到处理时间,则让线程休眠;如果没有更多消息,则在处理IdleHandler事后,在考虑让线程进入休眠 Message达到了可处理状态,则有Handler处理,处理时考虑三种情况,消息内容为Runnable时、设置了Handle.Callback时、普通消息时,对应调用为Message.callback.run() 、 Callback.handleMessage()、Handler.handleMessage() 从Handler机制里,epoll可以简单理解为,当Handler机制没有消息要处理时,让线程进入休眠,当Handler机制有消息要处理时,将线程唤起。通过Native端监听mWakeEventFd的I/O事件实现