Android Handler 线程消息机制
Android Handler 是用于线程间通信的机制,在不同线程中传递消息
子线程向主线程传递消息
由于Android有UI更新检查机制,只能单线程更新UI ,主线程创建的UI ,子线程无法进行更新,而我们的数据大多数都是通过网络请求获取的,网络请求属于耗时操作,只能在子线程处理,所以我们需要从子线程发送消息把数据发送到主线程进行更新UI.
private val mMainHandler : Handler = object :Handler(Looper.getMainLooper()){
override fun handleMessage(msg: Message) {
Log.d(TAG, "handleMessage() called with: msg = $msg")
if (msg.what == 1){
binding.sampleText.text = msg.obj as String //接收消息,更新主线程UI
}
}
}
Thread {
val message = mMainHandler.obtainMessage() //获取一个空消息
message.what = 1
message.obj = "data from ${Thread.currentThread().name}"
mMainHandler.sendMessage(message) // 发送消息
}.start()
输出
main handleMessage msg = { when=-23ms what=1 obj=data from Thread-2 target=com.greatfeng.jniactivity.MainActivity$mMainHandler$1 }
主线程向子线程传递消息
Thread {
Looper.prepare() //第一步 创建 Looper
//第二步 创建 Handler
mThreadHandler = object : Handler(Looper.myLooper()!!) {
override fun handleMessage(msg: Message) {
Log.d(TAG, "handleMessage() called with: msg = $msg")
}
}
//第三步 调用loop 无限循环
Looper.loop()
}.start()
// 子线程启动,mThreadHandler 初始化之后
val threadMessage = mThreadHandler.obtainMessage()
threadMessage.what = 2
threadMessage.obj = "data from ${Thread.currentThread().name}"
mThreadHandler.sendMessage(threadMessage)
输出
Thread-2 handleMessage msg = { when=0 what=2 obj=data from main target=com.greatfeng.jniactivity.MainActivity$onCreate$1$1 }
源码解析
我们拿子线程创建消息循环机制来说明
1. 子线程中创建 Looper 对象
// Looper.java
// 创建一个子线程的 Looper 对象
public static void prepare() {
prepare(true);
}
// 线程本地变量
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
// 创建一个Looper 对象,并把它存放在线程本地变量当中
sThreadLocal.set(new Looper(quitAllowed));
}
final MessageQueue mQueue;
final Thread mThread;
// Looper 对象创建存储了消息队列,和当前线程
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
Looper 构造函数里面创建了 MessageQueue
// MessageQueue.java
private long mPtr; // used by native code
private native static long nativeInit();
// 构造函数中调用了 native 方法
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit(); // 保存 native 层返回的指针
}
// android_os_MessageQueue.cpp
// 在native层创建了 NativeMessageQueue 对象 并把指针传递给了 java 层进行保存。
// java 层通过这个指针,就能找到该对象了
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
// NativeMessageQueue 构造函数
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 通过Looper的静态方法获取到当前线程的 native Looper对象 ,
// 第一次调用时线程特有数据没有存值,mLooper 为 NULL
mLooper = Looper::getForThread();
if (mLooper == NULL) {
// 创建一个 Looper 对象
mLooper = new Looper(false);
// 并把 Looper对象存储在线程特有数据
Looper::setForThread(mLooper);
}
}
要理解 Native 代码需要了解以下内容
// Looper.cpp
// 从线程特有数据获取 Looper 类似 java的 ThreadLocal.get()
sp<Looper> Looper::getForThread() {
// 一次性初始化
int result = pthread_once(& gTLSOnce, initTLSKey);
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
// 通过 pthread_key_t 从线程本地获取 native Looper对象
Looper* looper = (Looper*)pthread_getspecific(gTLSKey);
return sp<Looper>::fromExisting(looper);
}
// 把 Looper存储到线程特有数据 类似 java的 ThreadLocal.set()
void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
if (looper != nullptr) {
looper->incStrong((void*)threadDestructor);
}
// 存储到线程特有数据
pthread_setspecific(gTLSKey, looper.get());
if (old != nullptr) {
old->decStrong((void*)threadDestructor);
}
}
void Looper::initTLSKey() {
// 创建 pthread_key_t
int error = pthread_key_create(&gTLSKey, threadDestructor);
LOG_ALWAYS_FATAL_IF(error != 0, "Could not allocate TLS key: %s", strerror(error));
}
// Looper 的构造函数
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 创建 eventfd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
// 线程加锁,构造函数加锁,析构函数解锁
AutoMutex _l(mLock);
rebuildEpollLocked();
}
// 初始化 epoll
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
mEpollFd.reset();
}
// 创建 epoll
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
// 创建 epoll_event ,EventFd 序列号为 WAKE_EVENT_FD_SEQ
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
// epoll监听 eventfd 可读事件
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
// epoll 已经重新创建了,把之前监听文件描述符集合添加到监听
for (const auto& [seq, request] : mRequests) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
2. 创建 Handler
// Handler.java
// 创建 Handler对象需要传入 Looper 对象
public Handler(@NonNull Looper looper) {
this(looper, null, false);
}
// callback 可以进行处理消息
// async 代表是否异步消息
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
// 实现处理消息的接口就可以不覆写 Handler.handleMessage() 方法
public interface Callback {
boolean handleMessage(@NonNull Message msg);
}
// 子类需要覆写的处理消息的方法
public void handleMessage(@NonNull Message msg) {
}
// Handler 获取消息
public final Message obtainMessage()
{
return Message.obtain(this); //调用Message.obtain()获取消息
}
// handler post 一个 Runnable 对象
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
// 会包装成功一个消息存储在 Message.callback 变量中
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
// Handler 消息分发
public void dispatchMessage(@NonNull Message msg) {
// 如果 msg 的 callback 不为空,直接执行 Runnable.run 方法
if (msg.callback != null) {
handleCallback(msg);
} else {
// 如果 handler 构造函数中传递了 Callback 对象会优先交给 mCallback 处理
if (mCallback != null) {
// 如果 mCallback 处理该消息(返回 true) 直接返回
if (mCallback.handleMessage(msg)) {
return;
}
}
// 如果 mCallback 不处理消息就会分发给 Handler.handleMessage() 方法
handleMessage(msg);
}
}
// 直接调用 Runnable.run 方法
private static void handleCallback(Message message) {
message.callback.run();
}
3. 调用 Looper.loop() 方法
Looper.getMainLooper().setMessageLogging(); 可以过滤以下字符串,进行消息处理耗时统计,可以监控主线程是否发生卡顿的情况
>>>>> Dispatching to // 开始分发处理消息
<<<<< Finished to // 消息处理完成
// Looper.java
public static void loop() {
// 从线程本地变量中获取到之前存储的 Looper 对象
final Looper me = myLooper();
for (;;) {
// 无限循环去获取消息
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
@SuppressWarnings("AndroidFrameworkBinderIdentity")
private static boolean loopOnce(final Looper me,
final long ident, final int thresholdOverride) {
// 调用 MessageQueue.next() 去获取消息
Message msg = me.mQueue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return false;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " "
+ msg.callback + ": " + msg.what);
}
try {
// MessageQueue 中拿到的消息就获取 message 的target字段去处理消息,
// 而这个target 就是handler
msg.target.dispatchMessage(msg);
if (observer != null) {
observer.messageDispatched(token, msg);
}
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} catch (Exception exception) {
if (observer != null) {
observer.dispatchingThrewException(token, msg, exception);
}
throw exception;
} finally {
ThreadLocalWorkSource.restore(origWorkSource);
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// 消息分发完成,回收 Message,放入消息池
msg.recycleUnchecked();
return true;
}
MessageQueue
通过 IdleHandler 主线程消息进行闲时状态回调
// MessageQueue.java
Message next() {
// 创建 MessageQueue 时调用
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 第一次循环 nextPollTimeoutMillis 为 0 不会阻塞
// 当没有消息,并且第没有设置 IdleHandler时
// 第二次循环 nextPollTimeoutMillis 为 -1, 会被阻塞,除非有新消息被唤醒
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages; // 没有消息时 mMessages 为 null
// 除非设置消息屏障,msg.target 不会为 null
if (msg != null && msg.target == null) {
// 如果设置了消息屏障,优先处理异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
// 如果消息不为空
if (msg != null) {
if (now < msg.when) {
// 下一条消息尚未到执行时间,设置一个超时唤醒时长
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg; // 拿到消息退出循环
}
} else {
// 没有消息时,下次循环时是 -1
nextPollTimeoutMillis = -1;
}
// 当消息循环没有退出时,mQuitting 为 false
if (mQuitting) {
dispose();
return null;
}
// 第一次循环时 pendingIdleHandlerCount 为 -1
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
//当没有设置 IdleHandler 时 pendingIdleHandlerCount 为 0
if (pendingIdleHandlerCount <= 0) {
// 当没有设置 IdleHandler 时 并且没有消息时会阻塞。
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle(); //如果返回 true,不会被移除掉,下次进入 Idle状态还是会被执行
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
//当执行过一次 IdleHandler.queueIdle() 时,pendingIdleHandlerCount 设置为 0
// 没有消息是不会再次执行 IdleHandler.queueIdle() 方法,
// 除非再次收到消息,跳出循环,pendingIdleHandlerCount 重新设置为 -1,再次没有消息时 进入Idle状态 才会调用。
pendingIdleHandlerCount = 0;
// 当设置 IdleHandler 时,当没有消息时,不会立即阻塞,会执行一次 IdleHandler.queueIdle()
// 再次进行轮询一下 ,还没有消息就会进入阻塞状态
nextPollTimeoutMillis = 0;
}
}
// IdleHandler 接口,没有消息是会进行回调
public static interface IdleHandler {
boolean queueIdle();
}
// 添加 IdleHandler
public void addIdleHandler(@NonNull IdleHandler handler) {
if (handler == null) {
throw new NullPointerException("Can't add a null IdleHandler");
}
synchronized (this) {
mIdleHandlers.add(handler);
}
}
// 删除 IdleHandler
public void removeIdleHandler(@NonNull IdleHandler handler) {
synchronized (this) {
mIdleHandlers.remove(handler);
}
}
MessageQueue,Native 代码,调用 Looper 的 pollOnce() 方法
// android_os_MessageQueue.cpp
// 调用 NativeMessageQueue->pollOnce() 方法
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
// 调用 Looper 的 pollOnce() 方法
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
Native Looper.pollOnce() 方法 需要监听处理
- eventfd 文件描述符 ,epoll_event 传递序列号 为 WAKE_EVENT_FD_SEQ ,用于java 层消息循环
- 处理 native message ,进行调用 MessageHandler.handleMessage () 方法
- 处理监控其他文件描述 并调用其回调方法 LooperCallback.handleEvent() 方法
// Looper.cpp
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
// 调用 pollInner() 方法
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// mResponseIndex 默认为 0, 当没有添加文件描述符监控时 mResponses.size() 也为 0,跳出 while 循环,
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
// result 为 0
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// 调用 pollInner() 方法
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
// timeoutMillis 为 0
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// Poll.
int result = POLL_WAKE;
// 清理之前的结果
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// timeoutMillis 为 0 ,不阻塞,直接返回,没有消息 eventCount 为 0
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// 默认为 false
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// eventCount < 0 说明发生了错误
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// eventCount 为 0 说明超时
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
// 有eventCount个文件描述符有IO事件,包括 EventFd ,
// 序列号为 WAKE_EVENT_FD_SEQ ,这个专用于消息机制的
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
if (seq == WAKE_EVENT_FD_SEQ) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 通过序列号查找到文件描述 request
const auto& request_it = mRequests.find(seq);
if (request_it != mRequests.end()) {
const auto& request = request_it->second;
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 把 request 的IO 事件结果 ,保存到 Vector 里面
mResponses.push({.seq = seq, .events = events, .request = request});
} else {
ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64
" that is no longer registered.",
epollEvents, seq);
}
}
}
Done: ;
// 没有发送 Native 消息的话,mMessageEnvelopes.size() 为 0
// 如果有发送 Native 消息的话 ,进行处理
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
// 当到 Native 消息执行时间时
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
// 获取 Message
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
// 调用 handler handleMessage()方法处理消息
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// 没有监控文件描述符IO事件时 mResponses.size() 为 0
// 当添加了监控文件描述符IO事件,并且文件描述符有活动,
// 并且添加了IO事件接口回调 (response.request.ident == POLL_CALLBACK)
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 调用对应的接口回调方法 callback.handleEvent()方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
AutoMutex _l(mLock);
removeSequenceNumberLocked(response.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
// 返回 POLL_TIMEOUT
return result;
}
// 读取 eventfd,恢复状态
void Looper::awoken() {
uint64_t counter;
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
Message
- Message 实现了 Parcelable 可以跨进程传递
- Message 有个消息池,属于链表结构,大小为 50,Message 对象可以重复使用,避免频繁创新,销毁导致GC。
public final class Message implements Parcelable {
private static Message sPool; // 消息池链表表头
private static int sPoolSize = 0; // 消息池链表默认大小
private static final int MAX_POOL_SIZE = 50; // 消息池链表最大长度
public int what; // 用于定义的消息代码,以便 Handler 可以识别该消息的内容
public int arg1; // 用于携带消息参数 1,简单的 int 类型参数可以用此字段存储
public int arg2; // 用于携带消息参数 2,简单的 int 类型参数可以用此字段存储
public Object obj; // 通用消息参数,方便使用,如果不是 Parcelable ,无法跨进程传递
public Messenger replyTo; // 用于跨进程发送消息
int flags; // 用于标记 是否异步消息和 消息是否正在使用
public long when; // 消息的执行时间
Bundle data; // 需要传递的大的,复杂数据,可跨进程传递
Handler target; // 处理消息的 Handler
Runnable callback; // 把可执行的代码放入 Message 当中,传递给线程执行
Message next; // 下一个消息
public final Message obtainMessage()
{
return Message.obtain(this);
}
public static Message obtain(Handler h) {
Message m = obtain(); //获取消息
m.target = h;
return m;
}
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) { // 如果消息池中有消息,就从中获取
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message(); // 没有就新建一个消息
}
// 消息处理完成,Looper 会调用回收消息
void recycleUnchecked() {
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = UID_NONE;
workSourceUid = UID_NONE;
when = 0;
target = null;
callback = null;
data = null;
// 当消息池没满时,处理后的消息进行回收,放入消息池
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
}
4. 其他线程通过 Handler 发送消息
// Handler.java
public final boolean sendMessage(Message msg) {
return sendMessageDelayed(msg, 0);
}
public final boolean sendMessageDelayed(Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
// 消息加入到 MessageQueue , msg.target 设置为 当前的 Handler
private boolean enqueueMessage(MessageQueue queue, Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true); //设置异步标识
}
return queue.enqueueMessage(msg, uptimeMillis);
}
// MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
// 消息链表表头为空,when 为 0,when 小于表头消息的 when ,把此msg放入消息头部,优先处理
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked; // 如果目标线程被阻塞,应该唤醒
} else {
// 如果目标线程被阻塞 p.target == null 说明是消息屏障,msg是异步消息需要,应该唤醒
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 查找到该消息适合插入的位置
for (;;) {
prev = p;
p = p.next;
// 下一个消息为空,或者下一个消息的执行时间比 when 晚,跳出循环
if (p == null || when < p.when) {
break;
}
// 有消息屏障,这个时候只会处理异步消息,避免重复唤醒
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
// 插入链表
msg.next = p;
prev.next = msg;
}
// 如果需要唤醒目标线程,调用nativeWake唤醒线程
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
// android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
// 调用 Looper wake() 方法
void NativeMessageQueue::wake() {
mLooper->wake();
}
// Looper.cpp
// 唤醒线程,只需写入mWakeEventFd 写入 1 ,文件描述符可写,epoll_wait 方法返回
void Looper::wake() {
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
Native 消息机制
- 结构体 Message 相当于 java 中 Message
- Native MessageHandler 只相当于 java 中 Handler.Callback 一个接口回调
- Native Looper 结合了 java 中 Handler Looper ,MessageQueue 三者的功能集合
- Native Looper 中成员变量 Vector<MessageEnvelope> mMessageEnvelopes; java 中 MessageQueue 的功能
- Native Looper 中 Looper::sendMessage()类方法 与 java 中 Handler
- Native Looper 更是 java 中 Looper 消息循环底层实现
// native 消息 比较简单,只有一个成员 what
struct Message {
Message() : what(0) { }
Message(int w) : what(w) { }
int what;
};
// 包装 执行时间,handler,Message
struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t u, sp<MessageHandler> h, const Message& m)
: uptime(u), handler(std::move(h)), message(m) {}
nsecs_t uptime;
sp<MessageHandler> handler;
Message message;
};
// native 层的 Handler 只有一个 handleMessage 方法
class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler();
public:
virtual void handleMessage(const Message& message) = 0;
};
// 只有一个 handleEvent 方法
class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback();
public:
virtual int handleEvent(int fd, int events, void* data) = 0;
};
// 发送消息
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}
// 发送消息,延迟发送
void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
// 封装成 MessageEnvelope
MessageEnvelope messageEnvelope(uptime, handler, message);
// 插入数组
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// 如果正在处理消息,直接返回,不需要唤醒线程
if (mSendingMessage) {
return;
}
} // release lock
// 只有头部插入新消息才唤醒线程
if (i == 0) {
wake();
}
}
// 移除所有 MessageHandler 处理的消息
void Looper::removeMessages(const sp<MessageHandler>& handler) {
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
// 移除所有 MessageHandler 处理的并且类型为 what 的消息
void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler
&& messageEnvelope.message.what == what) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
文件描述符监控
epoll 是 Linux下多路复用IO接口,可以监听文件描述符的读写以及错误事件,只是用于 handler 消息循环机制(监听 EventFd 文件描述符)有点大材小用,为此 Android 添加了监控文件描述符接口
java 接口封装
java 层接口在 MessageQueue.java 文件中
// MessageQueue.java
// native 方法
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
public interface OnFileDescriptorEventListener {
// 可写事件
public static final int EVENT_INPUT = 1 << 0;
// 可读事件
public static final int EVENT_OUTPUT = 1 << 1;
// IO 错误事件
public static final int EVENT_ERROR = 1 << 2;
// 当文件描述符有读写以及错误事件会回调此方法
int onFileDescriptorEvents(FileDescriptor fd, int events);
/*
返回值: 一个新的需要监听的IO事件类型,或 0 取消注册侦听器
fd : 发生IO事件的文件描述符
events : IO事件类型,读写以及错误事件
*/
}
/*
fd : 需要监听的文件描述符
events : 需要监听的IO事件
listener : 回调接口
*/
// 添加 OnFileDescriptorEventListener 接口
public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd,
@OnFileDescriptorEventListener.Events int events,
@NonNull OnFileDescriptorEventListener listener) {
if (fd == null) {
throw new IllegalArgumentException("fd must not be null");
}
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
synchronized (this) {
updateOnFileDescriptorEventListenerLocked(fd, events, listener);
}
}
/*
需要移除的文件描述符
*/
// 移除 OnFileDescriptorEventListener 接口
public void removeOnFileDescriptorEventListener(@NonNull FileDescriptor fd) {
if (fd == null) {
throw new IllegalArgumentException("fd must not be null");
}
synchronized (this) {
updateOnFileDescriptorEventListenerLocked(fd, 0, null);
}
}
// 以文件描述符的 int 值为 key,value 为 FileDescriptorRecord
private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
// 封装了 文件描述符,IO事件类型和 OnFileDescriptorEventListener,
private static final class FileDescriptorRecord {
public final FileDescriptor mDescriptor;
public int mEvents;
public OnFileDescriptorEventListener mListener;
public int mSeq;
public FileDescriptorRecord(FileDescriptor descriptor,
int events, OnFileDescriptorEventListener listener) {
mDescriptor = descriptor;
mEvents = events;
mListener = listener;
}
}
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,
OnFileDescriptorEventListener listener) {
final int fdNum = fd.getInt$(); // 拿到文件描述符的 int 值
int index = -1;
FileDescriptorRecord record = null;
// 如果 mFileDescriptorRecords 不为空
if (mFileDescriptorRecords != null) {
// 从 SparseArray 查找 FileDescriptorRecord
index = mFileDescriptorRecords.indexOfKey(fdNum);
if (index >= 0) {
// 如果索引大于等于 0 说明有这个文件描述符监控记录
record = mFileDescriptorRecords.valueAt(index);
// 如果监听的的IO事件类型是一样的,说明属于重复调用,直接 返回
if (record != null && record.mEvents == events) {
return;
}
}
}
// 如果不为 0,说明是添加监听
if (events != 0) {
// 默认都添加IO错误事件监听
events |= OnFileDescriptorEventListener.EVENT_ERROR;
// 没有监听IO事件记录
if (record == null) {
// 如果 mFileDescriptorRecords 为 null ,就进行初始化
if (mFileDescriptorRecords == null) {
mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>();
}
// 创建 FileDescriptorRecord
record = new FileDescriptorRecord(fd, events, listener);
// 添加到 SparseArray 中
mFileDescriptorRecords.put(fdNum, record);
} else {
// 如果之前有监听IO事件记录就进行更新 listener 和 events,并且 mSeq 记录序列号加 1
record.mListener = listener;
record.mEvents = events;
record.mSeq += 1;
}
// 调用 native 方法,epoll 添加监听
nativeSetFileDescriptorEvents(mPtr, fdNum, events);
// 如果为 0,说明是移除监听。
} else if (record != null) {
// record != null 说明之前有监听IO事件记录需要移除,并调用 native 方法 epoll 添加监听
record.mEvents = 0;
mFileDescriptorRecords.removeAt(index);
nativeSetFileDescriptorEvents(mPtr, fdNum, 0);
}
}
/*
fd : 发生IO事件的文件描述符
events :IO事件的类型
*/
// 发生 IO事件 用于 Native 方法回调
private int dispatchEvents(int fd, int events) {
// Get the file descriptor record and any state that might change.
final FileDescriptorRecord record;
final int oldWatchedEvents;
final OnFileDescriptorEventListener listener;
final int seq;
synchronized (this) {
// 通过回调的文件描述符 int 值,获取 FileDescriptorRecord
record = mFileDescriptorRecords.get(fd);
// record 为 null,说明没有注册OnFileDescriptorEventListener监听 ,直接返回
if (record == null) {
return 0;
}
// 和 FileDescriptorRecord 中感兴趣的 IO事件
oldWatchedEvents = record.mEvents;
events &= oldWatchedEvents;
// 如果为 0 ,说明可能更新了 OnFileDescriptorEventListener,直接返回
if (events == 0) {
return oldWatchedEvents;
}
listener = record.mListener;
seq = record.mSeq;
}
// 调用 OnFileDescriptorEventListener.onFileDescriptorEvents() 方法
int newWatchedEvents = listener.onFileDescriptorEvents(
record.mDescriptor, events);
// 返回不为 0 的,默认添加IO错误事件
if (newWatchedEvents != 0) {
newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR;
}
// 如果新监听事件和之前记录的不一样,更新或者移除 FileDescriptorRecord
if (newWatchedEvents != oldWatchedEvents) {
synchronized (this) {
int index = mFileDescriptorRecords.indexOfKey(fd);
if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record
&& record.mSeq == seq) {
record.mEvents = newWatchedEvents;
if (newWatchedEvents == 0) {
mFileDescriptorRecords.removeAt(index);
}
}
}
}
// 返回新的需要监听的 IO事件
return newWatchedEvents;
}
Native 接口实现
// NativeMessageQueue.cpp
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,
jlong ptr, jint fd, jint events) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->setFileDescriptorEvents(fd, events);
}
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
// events 不为 0
if (events) {
int looperEvents = 0;
// 文件描述符可读事件
if (events & CALLBACK_EVENT_INPUT) {
looperEvents |= Looper::EVENT_INPUT;
}
// 文件描述符可写事件
if (events & CALLBACK_EVENT_OUTPUT) {
looperEvents |= Looper::EVENT_OUTPUT;
}
// 调用 Looper 的addFd()方法 ,NativeMessageQueue 继承了 LooperCallback
mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
reinterpret_cast<void*>(events));
} else {
// events 为 0 移除文件描述符监控
mLooper->removeFd(fd);
}
}
// 发生IO事件回调,Looper 会调用handleEvent()方法,handleEvent()方法把结果回调到Java层
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
int events = 0;
// IO可读事件
if (looperEvents & Looper::EVENT_INPUT) {
events |= CALLBACK_EVENT_INPUT;
}
// IO可写事件
if (looperEvents & Looper::EVENT_OUTPUT) {
events |= CALLBACK_EVENT_OUTPUT;
}
// IO错误事件
if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {
events |= CALLBACK_EVENT_ERROR;
}
int oldWatchedEvents = reinterpret_cast<intptr_t>(data);
// 调用 java 层 MessageQueue.dispatchEvents()
int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents, fd, events);
// 方法返回值为 0 Looper 会把监控的文件描述符移除
if (!newWatchedEvents) {
return 0;
}
// 如果返回的新监听的IO事件 与之前的不一致,重新设置
if (newWatchedEvents != oldWatchedEvents) {
setFileDescriptorEvents(fd, newWatchedEvents);
}
// 返回 1 会保留之前的监听状态
return 1;
}
// Looper.cpp
// NativeMessageQueue 调用了 addFd
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
sp<SimpleLooperCallback> looperCallback;
if (callback) {
looperCallback = sp<SimpleLooperCallback>::make(callback);
}
return addFd(fd, ident, events, looperCallback, data);
}
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = POLL_CALLBACK; // 有 LooperCallback 设置请求类型
}
{ // acquire lock
AutoMutex _l(mLock);
// WAKE_EVENT_FD_SEQ 为特别指定的消息循环序列号 ,序列号加 1
if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
const SequenceNumber seq = mNextRequestSeq++;
// 监听文件描述符IO事件封装成为一个 Request 结构体
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.callback = callback;
request.data = data;
// 创建 epoll_event 对象
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
// 从 通过文件描述符从 map中获取到 序列号
auto seq_it = mSequenceNumberByFd.find(fd);
// 如果未找到,说明该文件描述符是第一次添加到 epoll监听
if (seq_it == mSequenceNumberByFd.end()) {
// 添加到 epoll 监听
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
// 发生错误返回打印错误日志并返回 -1
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
// 把序列号为key,Request为 value 放入 map
mRequests.emplace(seq, request);
// 把文件描述符为key,序列号为value 放入 map
mSequenceNumberByFd.emplace(fd, seq);
} else {
// 通过文件描述符找到序列号,说明之前就已经添加到 epoll 监听只需要更改监听的 IO 事件类型
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
// 发生错误
if (epollResult < 0) {
// 错误类型为 ENOENT 说明我们修改不在 epoll兴趣列表中的文件描述符。
// 出现这种情况的原因在于,当关闭一个文件描述符会自动将其从所有的 epoll 实例的兴趣列表中移除 ,但是 mSequenceNumberByFd 并不会删除,
// 并且文件描述符是一种系统资源,关闭后会重新分配,因为这意味着较旧的文件描述符在其回调之前已关闭,
// 同时已创建具有相同编号的新文件描述符,现在正在首次注册,就会出现这种情况。
if (errno == ENOENT) {
// 重新添加 epoll 监听
epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
// 发生错误返回打印错误日志并返回 -1
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d: %s",
fd, strerror(errno));
return -1;
}
// 由于内核限制,我们需要从头开始重建 epoll 集,因为它可能包含一个旧的文件句柄,我们现在无法删除,因为它的文件描述符不再有效
// 发生错误,重新构建 epoll 监听
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
}
const SequenceNumber oldSeq = seq_it->second;
mRequests.erase(oldSeq);
mRequests.emplace(seq, request);
seq_it->second = seq;
}
} // release lock
return 1;
}
// 移除文件描述符监控
int Looper::removeFd(int fd) {
AutoMutex _l(mLock);
// 通过文件描述符拿到序列号
const auto& it = mSequenceNumberByFd.find(fd);
// 未找到说明已被移除直接返回 0
if (it == mSequenceNumberByFd.end()) {
return 0;
}
// 找到序列号,调用通过序列号移除 removeSequenceNumberLocked 方法真正移除
return removeSequenceNumberLocked(it->second);
}
int Looper::removeSequenceNumberLocked(SequenceNumber seq) {
const auto& request_it = mRequests.find(seq);
if (request_it == mRequests.end()) {
return 0;
}
const int fd = request_it->second.fd;
// 把相应的文件描述符和序列号从两个map中删除,先从map数据中删除,
// 而不是epoll文件描述符监控集
mRequests.erase(request_it);
mSequenceNumberByFd.erase(fd);
// epoll 移除文件描述符
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr);
// 移除失败, 重建 epoll 文件描述符监控集
if (epollResult < 0) {
if (errno == EBADF || errno == ENOENT) {
scheduleEpollRebuildLocked();
} else {
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
scheduleEpollRebuildLocked();
return -1;
}
}
return 1;
}
其他
-
主线程的 Looper 无限循环为什么不会导致应用ANR?
ANR 是 Android 系统监控第三方 app 响应操作是否超时机制。
- Service timeout
- 前台服务20s
- 后台服务200s
- Broadcasequeue Timeout
- 前台广播10s
- 后台广播 60s
- ContentProvider timeout 10s
- InputDispatching Timeout: 5s
只有主线程响应超时才会引起ANR,而这些操作都会封装成消息,加入到主线程 MessageQuenue 进而唤醒主线程执行操作。只要在规定的时间内完成操作并不会造成 ANR .
- 为啥主线程不需要我们创建Looper 因为在app启动时,main 函数的时候就已经创建了 Looper,并且调用了 Looper.loop()方法
- 之前 Handler有个默认构造函数,不需要参数,会通过 Looper.myLooper()获取当前线程的Looper对象 在 Handler 构造期间隐式选择 Looper 可能会导致错误。
- 当前线程没有调用 Looper.prepare()创建Looper 时会崩溃
- 如果创建 Handler的地方会被多个线程调用,Handler 关联的线程可能不是预期的
转载自:https://juejin.cn/post/7294468438391504947