目录
- 引言
- Looper的创建
- 发送消息与监听请求
- 发送消息
- 监听请求
- Looper 处理消息或请求
- 结束
引言
在分析底层源码时,时不时会碰到 Looper::wake() 或者 Looper::pollOnce() 这样的代码,之前大概知道是 Native 层的消息循环机制。为了以后我也能够使用它,我决定还是彻底分析一遍源码。
本文只涉及一个文件,路径如下
system/core/libutils/Looper.cpp
Looper的创建
在 Java 层,有一个线程的子类 HandlerThread,它可以创建一个线程,并且使用消息机制,其中的关键两步是 Looper::prepare() 和 Looper::loop()。
Looper::prepare() 会创建一个与线程绑定的 Looper 对象,这个绑定是通过 ThreadLocal实现的,而 Looper::loop() 会让线程进入无限循环来处理消息。
在 Native 层,也有一个 Looper 类,可以通过 Looper::prepare() 来创建 Looper 对象,代码如下
sp<Looper> Looper::prepare(int opts) { | |
// opts决定Looper::addFd()的参数callback是否可以为空 | |
bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS; | |
// 通过pthread_getspecific()获取线程本地存储中的Looper对象 | |
sp<Looper> looper = Looper::getForThread(); | |
if (looper == nullptr) { | |
looper = new Looper(allowNonCallbacks); | |
// 通过pthread_setspecific()把Looper对象保存到线程本地存储中 | |
Looper::setForThread(looper); | |
} | |
return looper; | |
} |
Native 层的线程在首次调用 Looper::prepare() 时,会创建 Looper 对象,并通过 pthread_setspecific() 把它保存到线程本地存储中,后面再获取 Looper 对象时,通过 pthread_getspecific() 从线程本地存储中获取。
这不就是 Java 的 ThreadLocal 的功能吗?
现在让我们来看下 Looper 对象的创建过程
Looper::Looper(bool allowNonCallbacks) | |
: mAllowNonCallbacks(allowNonCallbacks), | |
mSendingMessage(false), | |
mPolling(false), | |
mEpollRebuildRequired(false), | |
mNextRequestSeq(), | |
mResponseIndex(), | |
mNextMessageUptime(LLONG_MAX) { | |
//. 创建 eventfd 对象,用于唤醒 Looper | |
mWakeEventFd.reset(eventfd(, EFD_NONBLOCK | EFD_CLOEXEC)); | |
AutoMutex _l(mLock); | |
//. 创建 epoll 对象,并监听刚才创建的 eventfd 的输入事件 | |
rebuildEpollLocked(); | |
} |
首先创建了一个eventfd 对象,由 mWakeEventFd 代表,它用于唤醒 Looper 。如何唤醒呢? 继续往下看。
然后调用 rebuildEpollLocked() 创建一个 epoll 对象,并监听刚才创建的 mWakeEventFd 的 I/O 事件,代码如下
void Looper::rebuildEpollLocked() { | |
// ... | |
// 创建epoll对象 | |
mEpollFd.reset(epoll_create(EPOLL_CLOEXEC)); | |
// 监听 mWakeEventFd 输入事件 | |
struct epoll_event eventItem; | |
memset(& eventItem,, sizeof(epoll_event)); // zero out unused members of data field union | |
eventItem.events = EPOLLIN; | |
eventItem.data.fd = mWakeEventFd.get(); | |
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem); | |
for (size_t i =; i < mRequests.size(); i++) { | |
// ... | |
} | |
} |
epoll 对象监听了 mWakeEventFd 的可读事件。在后面的分析中,我们将看到,当 Looper 开始轮询时,会调用 epoll_wait() 阻塞地等待事件,那么有人向 mWakeEventFd 写入数据时,epoll_wait() 将返回,那么 Looper 就被唤醒。
发送消息与监听请求
Native 层的 Looper 可以处理两种类型的事件,一种是消息( Message ),另一种是请求( Request )。下面我们来看看如何向 Looper 发送消息,如何让 Looper 监听请求。
发送消息
通过 Looper::sendMessageXXX() 这一类函数,可以向 Looper 发送消息
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, | |
const Message& message) { | |
size_t i =; | |
{ // acquire lock | |
// 通过锁,可以保存 mMessageEnvelopes 的线程安全 | |
AutoMutex _l(mLock); | |
// 获取消息要插入的位置 | |
size_t messageCount = mMessageEnvelopes.size(); | |
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { | |
i +=; | |
} | |
//. 把信息保存到 mMessageEnvelopes 中 | |
MessageEnvelope messageEnvelope(uptime, handler, message); | |
mMessageEnvelopes.insertAt(messageEnvelope, i,); | |
// mSendingMessage 表明 Looper 正在处理消息,因此不用唤醒Looper | |
if (mSendingMessage) { | |
return; | |
} | |
} // release lock | |
//. 如有必要,就唤醒Looper | |
if (i ==) { | |
wake(); | |
} | |
} |
当 Looper 接收到消息时,它会把消息保存到 mMessageEnvelopes 容器中,并且如果有必要,那么会调用 Looper::wake() 唤醒 Looper 来处理消息。
前面我们大概地说明了下如何通过 mWakeEventFd 这个 eventfd 对象唤醒 Looper,现在让我们来看下唤醒是如何实现的
void Looper::wake() { | |
uint_t inc = 1; | |
// 向 mWakeFd 中写入数据 | |
// TEMP_FAILURE_RETRY 是一个重试机制,确保不会因为系统中断而导致数据没有写入 | |
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint_t))); | |
// 确保写入的是 unsigned-bit 的数据 | |
if (nWrite != sizeof(uint_t)) { | |
if (errno != EAGAIN) { | |
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s", | |
mWakeEventFd.get(), nWrite, strerror(errno)); | |
} | |
} | |
} |
原来就是向 mWakeEventFd 写数据。在后面我们将看到,当 Looper 进入轮询时, 当epoll_wait() 检测到 mWakeEventFd 有数据可读时,就会从阻塞中醒来,从而达到 mWakeEventFd 唤醒 Looper 的目的。
监听请求
现在我们来看下如何让 Looper 监听请求,它是通过 Looper::addFd() 实现的
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { | |
//. 确认 ident 的值 | |
if (!callback.get()) { // 回调为空 | |
if (! mAllowNonCallbacks) { // mAllowNonCallbacks是在创建Looper时初始化的 | |
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); | |
return -; | |
} | |
if (ident <) { // 回调为空,ident的值不能小于0 | |
ALOGE("Invalid attempt to set NULL callback with ident <."); | |
return -; | |
} | |
} else { // 回调不为空 | |
// POLL_CALLBACK值为-,表示请求通过回调处理请求 | |
ident = POLL_CALLBACK; | |
} | |
{ // acquire lock | |
AutoMutex _l(mLock); | |
//. 包装成一个 Request | |
Request request; | |
request.fd = fd; | |
request.ident = ident; | |
request.events = events; | |
request.seq = mNextRequestSeq++; | |
request.callback = callback; | |
request.data = data; | |
if (mNextRequestSeq == -) mNextRequestSeq = 0; // reserve sequence number -1 | |
struct epoll_event eventItem; | |
request.initEventItem(&eventItem); | |
//. 用 epoll 对象监听 fd 的事件,并把请求保存到 mRequests 中 | |
ssize_t requestIndex = mRequests.indexOfKey(fd); | |
if (requestIndex <) { | |
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem); | |
if (epollResult <) { | |
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno)); | |
return -; | |
} | |
mRequests.add(fd, request); | |
} else { | |
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem); | |
if (epollResult <) { | |
// ... | |
} | |
mRequests.replaceValueAt(requestIndex, request); | |
} | |
} // release lock | |
return; | |
} |
Looper::addFd() 的实质就是用 epoll 对象监听指定 fd 的 I/O 事件。为何我把这一过程称之为 监听请求 呢 ? 因为这个函数把它的参数包装成一个 Request 对象,并保存到 mRequests 容器 中。
Looper 进行轮询时,epoll_wait() 会阻塞,当 fd 指向的文件有 I/O 事件时,epoll_wait() 将会获取到 fd 的可读事件,因此 Looper 会被唤醒。
当 Looper 检测到有请求到来时,一般是通过回调处理的,也就是这里的参数 callback。当然,也可以不设置回调,当有请求到来时,交给外部的调用者去处理,我们将会在后面看到。
根据以上知识,我们来理解 Looper::addFd() 的第一步。当 callback 参数为空时,ident 必须大于或等于0,否则为 POLL_CALLBACK,注意,这的值是 -2。 因此呢,当我们检测到一个请求的 ident 大于或等于0时,这个请求肯定不是通过回调处理的。这一点非常重要,我们将会在后面用到。
Looper 处理消息或请求
Native 层的 Looper 是通过 Looper::pollOnce() 或 Looper::pollAll() 来统一处理消息和请求的,我们挑 Looper::pollOnce() 这个函数来分析下
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { | |
int result =; | |
for (;;) { // 无限循环 | |
//. 处理那些不是通过callback处理的请求 | |
while (mResponseIndex < mResponses.size()) { | |
// ... | |
} | |
//. 处理pollInner()轮询的结果 | |
if (result !=) { | |
// ... | |
} | |
//. epoll 等待并处理事件(如果有事件到来) | |
result = pollInner(timeoutMillis); | |
} | |
} |
当首次调用 Looper::pollOnce() 时,第一步和第二步肯定不会发生,那么我们先来看下第三步,这个函数比较长,我们一步步解析
int Looper::pollInner(int timeoutMillis) { | |
// 省略计算 timeoutMillis 的代码 | |
int result = POLL_WAKE; | |
mResponses.clear(); | |
mResponseIndex =; | |
mPolling = true; | |
struct epoll_event eventItems[EPOLL_MAX_EVENTS]; | |
//. 等待事件 | |
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); | |
// ... | |
} |
第一步,通过 epoll_wait() 阻塞地等待它监听的 fd 的 I/O 就绪, 此时 Looper 进入休眠。
那么怎么唤醒 Looper 呢? 根据前面的分析,epoll 对象监听了 mWakeEventFd 以及 通过 Looper::addFd() 添加的 fd。 那么向这些被监听的 fd 写入数据,就可以唤醒 Looper。例如,Looper::wake() 就是通过向 mWakeEventFd 写入数据来唤醒 Looper。
那么现在我们来看下 Looper 被唤醒后的处理流程
int Looper::pollInner(int timeoutMillis) { | |
// ... | |
//. 等待I/O就绪 | |
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); | |
// ... | |
//. 处理 epoll 事件 | |
for (int i =; i < eventCount; i++) { | |
int fd = eventItems[i].data.fd; | |
uint_t epollEvents = eventItems[i].events; | |
if (fd == mWakeEventFd.get()) { | |
//.1 处理被消息唤醒的情况 | |
if (epollEvents & EPOLLIN) { | |
// 清理eventfd中的数据 | |
awoken(); | |
} else { | |
ALOGW("Ignoring unexpected epoll eventsx%x on wake event fd.", epollEvents); | |
} | |
} else { | |
//.2 处理被请求唤醒的情况 | |
ssize_t requestIndex = mRequests.indexOfKey(fd); | |
if (requestIndex >=) { | |
// 根据epoll触发的事件类型,填充events相应的位 | |
int events =; | |
if (epollEvents & EPOLLIN) events |= EVENT_INPUT; | |
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; | |
if (epollEvents & EPOLLERR) events |= EVENT_ERROR; | |
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; | |
// 创建Response对象,保存两个参数,然后把Response对象保存到mResponses中 | |
pushResponse(events, mRequests.valueAt(requestIndex)); | |
} else { | |
ALOGW("Ignoring unexpected epoll eventsx%x on fd %d that is " | |
"no longer registered.", epollEvents, fd); | |
} | |
} | |
} | |
} |
当 mWakeEventFd 的 I/O 就绪,就会走到2.1步,之后会读取 mWakeEventFd 中的数据,读取的数据并没有什么用,只是清理数据而已。而这一步,大部分情况 是由于消息的到来,而极少情况是并不是因为消息的到来,而是因为线程有紧急事情需要处理,所以必须要唤醒。
当通过Looper::addFd() 添加的 fd 就绪时,就会走到 2.2 步,这一步一定是因为请求到来了。它会创建 Reponse 对象,并保存,代码如下
void Looper::pushResponse(int events, const Request& request) { | |
Response response; | |
response.events = events; | |
response.request = request; | |
mResponses.push(response); | |
} |
这里我们要注意下,mResponses 容器表示待处理请求的集合,这些请求会在后面处理,让我们接着往下看。
int Looper::pollInner(int timeoutMillis) { | |
// ... | |
//. 等待I/O就绪 | |
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); | |
// ... | |
//. 处理 epoll 事件 | |
for (int i =; i < eventCount; i++) { | |
// ... | |
} | |
Done: ; | |
mNextMessageUptime = LLONG_MAX; | |
//. 处理消息 | |
while (mMessageEnvelopes.size() !=) { // 循环处理消息 | |
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); | |
//.1 取出队头消息 | |
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(); | |
if (messageEnvelope.uptime <= now) { | |
//.2 队头消息处理的时间点小于当前时间点,表示要立即处理消息 | |
{ | |
// 获取handler | |
sp<MessageHandler> handler = messageEnvelope.handler; | |
Message message = messageEnvelope.message; | |
mMessageEnvelopes.removeAt(); | |
mSendingMessage = true; | |
mLock.unlock(); | |
// 消息交给handler处理 | |
handler->handleMessage(message); | |
} // release handler | |
mLock.lock(); | |
mSendingMessage = false; | |
// POLL_CALLBACK 表示消息被回调处理 | |
result = POLL_CALLBACK; | |
} else { | |
//.2 队头的消息处理的时间点大于当前时间,表示还没有到处理的时间点,就退出处理消息的循环 | |
// mNextMessageUptime 表示下一个消息要处理的时间点,当通过break退出循环后, | |
// 在外层的下一次循调用pollInner()时,会通过 mNextMessageUptime 计算 epoll_wait 的超时时间 | |
mNextMessageUptime = messageEnvelope.uptime; | |
break; | |
} | |
} | |
// Release lock. | |
mLock.unlock(); | |
} |
根据前面分析的消息发送的过程,消息保存在 mMessageEnvelopes 中。那么这里的第三步,很明显是在处理消息。通过循环,不断取出消息,然后把消息的 messageEnvelope.uptime 与当前时间进行比较,如果小于当前时间,就证明要立马处理消息了,否则这些消息只能在下一次轮询中再处理。
处理完了消息,现在来处理请求
int Looper::pollInner(int timeoutMillis) { | |
// ... | |
//. 等待事件 | |
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); | |
// ... | |
//. 处理 epoll 事件 | |
for (int i =; i < eventCount; i++) { | |
// ... | |
} | |
Done: ; | |
//. 处理消息 | |
while (mMessageEnvelopes.size() !=) { // 循环处理消息 | |
// ... | |
} | |
//. 循环处理请求 | |
for (size_t i =; i < mResponses.size(); i++) { | |
Response& response = mResponses.editItemAt(i); | |
// 检测请求是否通过回调处理 | |
if (response.request.ident == POLL_CALLBACK) { | |
int fd = response.request.fd; | |
int events = response.events; | |
void* data = response.request.data; | |
int callbackResult = response.request.callback->handleEvent(fd, events, data); | |
if (callbackResult ==) { | |
removeFd(fd, response.request.seq); | |
} | |
response.request.callback.clear(); | |
// 表明消息被回调处理了 | |
result = POLL_CALLBACK; | |
} | |
} | |
// 返回结果 | |
return result; | |
} |
刚刚我们还提到,mResponse 中保存了待处理的请求。现在通过循环,不断取出请求来处理。处理请求有一个条件,那就是请求必须有回调,否则不处理。 再回顾前面分析 监听请求 的代码,当Looper::addFd() 的参数 callback 不为空时,Request.ident 的值为 POLL_CALLBACK,表明请求需要通过回调处理。
Looper::pollInner() 函数分析完毕,现在再回到 Looper::pollOnce()
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { | |
int result =; | |
for (;;) { // 无限循环 | |
//. 处理那些不是通过callback处理的请求 | |
while (mResponseIndex < mResponses.size()) { | |
const Response& response = mResponses.itemAt(mResponseIndex++); | |
int ident = response.request.ident; | |
// 从Looper::addFd()分析可知,只有当callback为空的情况下,ident的值>=,否则为POLL_CALLBACK(-2) | |
// 因此,这里处理的是那些没有通过callback处理的请求 | |
if (ident >=) { | |
int fd = response.request.fd; | |
int events = response.events; | |
void* data = response.request.data; | |
// 因为Looper无法通过callback处理,所以把这些元数据交给调用者处理 | |
if (outFd != nullptr) *outFd = fd; | |
if (outEvents != nullptr) *outEvents = events; | |
if (outData != nullptr) *outData = data; | |
// 注意,这里返回的值大于 | |
return ident; | |
} | |
} | |
//. 处理pollInner()轮询的结果 | |
// result的值有很多种,但是都为负数(注意,上面处理那些不是通过callback处理的请求,返回正值) | |
//. POLL_WAKE(-1): 表示epoll_wait()是被eventfd唤醒的 | |
//. POLL_ERROR(-4): 表示epoll_wait()出错 | |
//. POLL_TIMEOUT(-3) : 表示epoll_wait()超时 | |
//. POLL_CALLBACK(-2) : 表示消息或请求是通过回调处理的 | |
if (result !=) { | |
// 消息或事件无论是否被callback处理,这些传入的参数都没有意义,因此清空 | |
if (outFd != nullptr) *outFd =; | |
if (outEvents != nullptr) *outEvents =; | |
if (outData != nullptr) *outData = nullptr; | |
// 注意,返回的是负值 | |
return result; | |
} | |
//. epoll 等待并处理事件(如果有事件到来) | |
result = pollInner(timeoutMillis); | |
} | |
} |
从整体看,当 pollInner() 返回后,就会调用第一步和第二步来处理结果。
首先来看第一步,根据前面 监听请求 的分析,当 Looper::addFd() 的参数 callback 为空时,Request.ident 的值才大于等于0。Looper::pollInner 只通过回调来处理请求,而对于那些没有回调的请求呢?那就是在这里处理。而处理的方式是直接把元数据返回给调用者,那么意思就很明显了,让调用者自己处理。
再来看第二步,直接返回 Looper::pollInner() 的结果,并把参数清0。因为无论返回的什么结果,这些参数都没有意义了,这一点请大家自己体会。
关于第一步和第二步,还有一点需要关注,第一步的返回值是正值,而第二步返回值是负值。
结束
本文对 Native 的 Looper 的主要函数进行分析,揭开了 Native 层消息机制的核心,但是目前我并不能给一个很好例子来理解本文的内容。需要大家在分析 Native 层源码时慢慢体会。
可能有人会问,你为何不以 Java 层的消息机制为例来引出 Native 层的消息机制呢? 因为这样废话太多。