Native层消息机制深入探究实例解析

手机APP/开发
321
0
0
2023-07-23
目录
  • 引言
  • Looper的创建
  • 发送消息与监听请求
  • 发送消息
  • 监听请求
  • Looper 处理消息或请求
  • 结束

引言


在分析底层源码时,时不时会碰到 Looper::wake() 或者 Looper::pollOnce() 这样的代码,之前大概知道是 Native 层的消息循环机制。为了以后我也能够使用它,我决定还是彻底分析一遍源码。

本文只涉及一个文件,路径如下

system/core/libutils/Looper.cpp

Looper的创建

在 Java 层,有一个线程的子类 HandlerThread,它可以创建一个线程,并且使用消息机制,其中的关键两步是 Looper::prepare() 和 Looper::loop()。

Looper::prepare() 会创建一个与线程绑定的 Looper 对象,这个绑定是通过 ThreadLocal实现的,而 Looper::loop() 会让线程进入无限循环来处理消息。

在 Native 层,也有一个 Looper 类,可以通过 Looper::prepare() 来创建 Looper 对象,代码如下

sp<Looper> Looper::prepare(int opts) {
// opts决定Looper::addFd()的参数callback是否可以为空
bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS;
// 通过pthread_getspecific()获取线程本地存储中的Looper对象
sp<Looper> looper = Looper::getForThread();
if (looper == nullptr) {
looper = new Looper(allowNonCallbacks);
// 通过pthread_setspecific()把Looper对象保存到线程本地存储中
Looper::setForThread(looper);
}
return looper;
}

Native 层的线程在首次调用 Looper::prepare() 时,会创建 Looper 对象,并通过 pthread_setspecific() 把它保存到线程本地存储中,后面再获取 Looper 对象时,通过 pthread_getspecific() 从线程本地存储中获取。

这不就是 Java 的 ThreadLocal 的功能吗?

现在让我们来看下 Looper 对象的创建过程

Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(),
mResponseIndex(),
mNextMessageUptime(LLONG_MAX) {
//. 创建 eventfd 对象,用于唤醒 Looper
mWakeEventFd.reset(eventfd(, EFD_NONBLOCK | EFD_CLOEXEC));
AutoMutex _l(mLock);
//. 创建 epoll 对象,并监听刚才创建的 eventfd 的输入事件
rebuildEpollLocked();
}

首先创建了一个eventfd 对象,由 mWakeEventFd 代表,它用于唤醒 Looper 。如何唤醒呢? 继续往下看。

然后调用 rebuildEpollLocked() 创建一个 epoll 对象,并监听刚才创建的 mWakeEventFd 的 I/O 事件,代码如下

void Looper::rebuildEpollLocked() {
// ...
// 创建epoll对象
mEpollFd.reset(epoll_create(EPOLL_CLOEXEC));
// 监听 mWakeEventFd 输入事件
struct epoll_event eventItem;
memset(& eventItem,, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
for (size_t i =; i < mRequests.size(); i++) {
// ...
}
}

epoll 对象监听了 mWakeEventFd 的可读事件。在后面的分析中,我们将看到,当 Looper 开始轮询时,会调用 epoll_wait() 阻塞地等待事件,那么有人向 mWakeEventFd 写入数据时,epoll_wait() 将返回,那么 Looper 就被唤醒。

发送消息与监听请求

Native 层的 Looper 可以处理两种类型的事件,一种是消息( Message ),另一种是请求( Request )。下面我们来看看如何向 Looper 发送消息,如何让 Looper 监听请求。

发送消息

通过 Looper::sendMessageXXX() 这一类函数,可以向 Looper 发送消息

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i =;
{ // acquire lock
// 通过锁,可以保存 mMessageEnvelopes 的线程安全
AutoMutex _l(mLock);
// 获取消息要插入的位置
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i +=;
}
//. 把信息保存到 mMessageEnvelopes 中
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i,);
// mSendingMessage 表明 Looper 正在处理消息,因此不用唤醒Looper
if (mSendingMessage) {
return;
}
} // release lock
//. 如有必要,就唤醒Looper
if (i ==) {
wake();
}
}

当 Looper 接收到消息时,它会把消息保存到 mMessageEnvelopes 容器中,并且如果有必要,那么会调用 Looper::wake() 唤醒 Looper 来处理消息。

前面我们大概地说明了下如何通过 mWakeEventFd 这个 eventfd 对象唤醒 Looper,现在让我们来看下唤醒是如何实现的

void Looper::wake() {
uint_t inc = 1;
// 向 mWakeFd 中写入数据
// TEMP_FAILURE_RETRY 是一个重试机制,确保不会因为系统中断而导致数据没有写入
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint_t)));
// 确保写入的是 unsigned-bit 的数据
if (nWrite != sizeof(uint_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}

原来就是向 mWakeEventFd 写数据。在后面我们将看到,当 Looper 进入轮询时, 当epoll_wait() 检测到 mWakeEventFd 有数据可读时,就会从阻塞中醒来,从而达到 mWakeEventFd 唤醒 Looper 的目的。

监听请求

现在我们来看下如何让 Looper 监听请求,它是通过 Looper::addFd() 实现的

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
//. 确认 ident 的值
if (!callback.get()) { // 回调为空
if (! mAllowNonCallbacks) { // mAllowNonCallbacks是在创建Looper时初始化的
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -;
}
if (ident <) { // 回调为空,ident的值不能小于0
ALOGE("Invalid attempt to set NULL callback with ident <.");
return -;
}
} else { // 回调不为空
// POLL_CALLBACK值为-,表示请求通过回调处理请求
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
//. 包装成一个 Request
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
//. 用 epoll 对象监听 fd 的事件,并把请求保存到 mRequests 中
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex <) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult <) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult <) {
// ...
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return;
}

Looper::addFd() 的实质就是用 epoll 对象监听指定 fd 的 I/O 事件。为何我把这一过程称之为 监听请求 呢 ? 因为这个函数把它的参数包装成一个 Request 对象,并保存到 mRequests 容器 中。

Looper 进行轮询时,epoll_wait() 会阻塞,当 fd 指向的文件有 I/O 事件时,epoll_wait() 将会获取到 fd 的可读事件,因此 Looper 会被唤醒。

当 Looper 检测到有请求到来时,一般是通过回调处理的,也就是这里的参数 callback。当然,也可以不设置回调,当有请求到来时,交给外部的调用者去处理,我们将会在后面看到。

根据以上知识,我们来理解 Looper::addFd() 的第一步。当 callback 参数为空时,ident 必须大于或等于0,否则为 POLL_CALLBACK,注意,这的值是 -2。 因此呢,当我们检测到一个请求的 ident 大于或等于0时,这个请求肯定不是通过回调处理的。这一点非常重要,我们将会在后面用到。

Looper 处理消息或请求

Native 层的 Looper 是通过 Looper::pollOnce() 或 Looper::pollAll() 来统一处理消息和请求的,我们挑 Looper::pollOnce() 这个函数来分析下

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result =;
for (;;) { // 无限循环
//. 处理那些不是通过callback处理的请求
while (mResponseIndex < mResponses.size()) {
// ...
}
//. 处理pollInner()轮询的结果
if (result !=) {
// ...
}
//. epoll 等待并处理事件(如果有事件到来)
result = pollInner(timeoutMillis);
}
}

当首次调用 Looper::pollOnce() 时,第一步和第二步肯定不会发生,那么我们先来看下第三步,这个函数比较长,我们一步步解析

int Looper::pollInner(int timeoutMillis) {
// 省略计算 timeoutMillis 的代码
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex =;
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//. 等待事件
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
}

第一步,通过 epoll_wait() 阻塞地等待它监听的 fd 的 I/O 就绪, 此时 Looper 进入休眠。

那么怎么唤醒 Looper 呢? 根据前面的分析,epoll 对象监听了 mWakeEventFd 以及 通过 Looper::addFd() 添加的 fd。 那么向这些被监听的 fd 写入数据,就可以唤醒 Looper。例如,Looper::wake() 就是通过向 mWakeEventFd 写入数据来唤醒 Looper。

那么现在我们来看下 Looper 被唤醒后的处理流程

int Looper::pollInner(int timeoutMillis) {
// ...
//. 等待I/O就绪
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
//. 处理 epoll 事件
for (int i =; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
//.1 处理被消息唤醒的情况
if (epollEvents & EPOLLIN) {
// 清理eventfd中的数据
awoken();
} else {
ALOGW("Ignoring unexpected epoll eventsx%x on wake event fd.", epollEvents);
}
} else {
//.2 处理被请求唤醒的情况
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >=) {
// 根据epoll触发的事件类型,填充events相应的位
int events =;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 创建Response对象,保存两个参数,然后把Response对象保存到mResponses中
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll eventsx%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
}

当 mWakeEventFd 的 I/O 就绪,就会走到2.1步,之后会读取 mWakeEventFd 中的数据,读取的数据并没有什么用,只是清理数据而已。而这一步,大部分情况 是由于消息的到来,而极少情况是并不是因为消息的到来,而是因为线程有紧急事情需要处理,所以必须要唤醒。

当通过Looper::addFd() 添加的 fd 就绪时,就会走到 2.2 步,这一步一定是因为请求到来了。它会创建 Reponse 对象,并保存,代码如下

void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}

这里我们要注意下,mResponses 容器表示待处理请求的集合,这些请求会在后面处理,让我们接着往下看。

int Looper::pollInner(int timeoutMillis) {
// ...
//. 等待I/O就绪
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
//. 处理 epoll 事件
for (int i =; i < eventCount; i++) {
// ...
}
Done: ;
mNextMessageUptime = LLONG_MAX;
//. 处理消息
while (mMessageEnvelopes.size() !=) { // 循环处理消息
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
//.1 取出队头消息
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt();
if (messageEnvelope.uptime <= now) {
//.2 队头消息处理的时间点小于当前时间点,表示要立即处理消息
{
// 获取handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt();
mSendingMessage = true;
mLock.unlock();
// 消息交给handler处理
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
// POLL_CALLBACK 表示消息被回调处理
result = POLL_CALLBACK;
} else {
//.2 队头的消息处理的时间点大于当前时间,表示还没有到处理的时间点,就退出处理消息的循环
// mNextMessageUptime 表示下一个消息要处理的时间点,当通过break退出循环后,
// 在外层的下一次循调用pollInner()时,会通过 mNextMessageUptime 计算 epoll_wait 的超时时间
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
}

根据前面分析的消息发送的过程,消息保存在 mMessageEnvelopes 中。那么这里的第三步,很明显是在处理消息。通过循环,不断取出消息,然后把消息的 messageEnvelope.uptime 与当前时间进行比较,如果小于当前时间,就证明要立马处理消息了,否则这些消息只能在下一次轮询中再处理。

处理完了消息,现在来处理请求

int Looper::pollInner(int timeoutMillis) {
// ...
//. 等待事件
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
//. 处理 epoll 事件
for (int i =; i < eventCount; i++) {
// ...
}
Done: ;
//. 处理消息
while (mMessageEnvelopes.size() !=) { // 循环处理消息
// ...
}
//. 循环处理请求
for (size_t i =; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// 检测请求是否通过回调处理
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult ==) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
// 表明消息被回调处理了
result = POLL_CALLBACK;
}
}
// 返回结果
return result;
}

刚刚我们还提到,mResponse 中保存了待处理的请求。现在通过循环,不断取出请求来处理。处理请求有一个条件,那就是请求必须有回调,否则不处理。 再回顾前面分析 监听请求 的代码,当Looper::addFd() 的参数 callback 不为空时,Request.ident 的值为 POLL_CALLBACK,表明请求需要通过回调处理。

Looper::pollInner() 函数分析完毕,现在再回到 Looper::pollOnce()

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result =;
for (;;) { // 无限循环
//. 处理那些不是通过callback处理的请求
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
// 从Looper::addFd()分析可知,只有当callback为空的情况下,ident的值>=,否则为POLL_CALLBACK(-2)
// 因此,这里处理的是那些没有通过callback处理的请求
if (ident >=) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 因为Looper无法通过callback处理,所以把这些元数据交给调用者处理
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
// 注意,这里返回的值大于
return ident;
}
}
//. 处理pollInner()轮询的结果
// result的值有很多种,但是都为负数(注意,上面处理那些不是通过callback处理的请求,返回正值)
//. POLL_WAKE(-1): 表示epoll_wait()是被eventfd唤醒的
//. POLL_ERROR(-4): 表示epoll_wait()出错
//. POLL_TIMEOUT(-3) : 表示epoll_wait()超时
//. POLL_CALLBACK(-2) : 表示消息或请求是通过回调处理的
if (result !=) {
// 消息或事件无论是否被callback处理,这些传入的参数都没有意义,因此清空
if (outFd != nullptr) *outFd =;
if (outEvents != nullptr) *outEvents =;
if (outData != nullptr) *outData = nullptr;
// 注意,返回的是负值
return result;
}
//. epoll 等待并处理事件(如果有事件到来)
result = pollInner(timeoutMillis);
}
}

从整体看,当 pollInner() 返回后,就会调用第一步和第二步来处理结果。

首先来看第一步,根据前面 监听请求 的分析,当 Looper::addFd() 的参数 callback 为空时,Request.ident 的值才大于等于0。Looper::pollInner 只通过回调来处理请求,而对于那些没有回调的请求呢?那就是在这里处理。而处理的方式是直接把元数据返回给调用者,那么意思就很明显了,让调用者自己处理。

再来看第二步,直接返回 Looper::pollInner() 的结果,并把参数清0。因为无论返回的什么结果,这些参数都没有意义了,这一点请大家自己体会。

关于第一步和第二步,还有一点需要关注,第一步的返回值是正值,而第二步返回值是负值。

结束

本文对 Native 的 Looper 的主要函数进行分析,揭开了 Native 层消息机制的核心,但是目前我并不能给一个很好例子来理解本文的内容。需要大家在分析 Native 层源码时慢慢体会。

可能有人会问,你为何不以 Java 层的消息机制为例来引出 Native 层的消息机制呢? 因为这样废话太多。