Handler 原理分析
前言
为什么写这篇文章?因为我忘了,2个月前把Handler源码已经读烂了,2个月后....
也许是疫情原因,公司效益不好,开启了一轮裁员吧,又得找工作了,但是我Handler已经忘光光了,只记得大概的原理,甚至连同步屏障的机制作用都忘光光了。然后,一个重点是,很迷茫,不知道干什么,不知道看什么知识。所以,写篇文章吧,一边是梳理下Handler知识,一边让自己迷茫的心有个努力的方向,至少今天输出一篇文章了(ps:一篇可能已经被写烂的题材了)。给看这篇文章的读者一个建议是,看的时候最好再看看源码吧,Handler了相比于AMS,PMS,WMS这些,算是能很直观的看到整个代码逻辑的了。废话不多说了,接下来是内容干货了。(本文代码是kotlin和java的混合)
本文主要从以下几个方面讲解Handler。
- 为什么handler能实现线程程间的通信 (原理,前提),实现(Message,MessageQueus,Handler)
- 为什么loop不会导致ANR.
- Handler的同步屏障机制
- Handler的一些其他方法使用
1. Handler机制的前提与基本思想
先整个大前提吧。
在系统中,进程是资源的分配单位,而线程是CPU的分配单位。因此一个进程中的所有线程所拥有的资源,如内存空间等是共享的。因此多个线程可以对同一个内存对象进程操作.而Handler的实现就是通过一个MessageQueue
队列进行数据的存储读取操作实现线程间的通信。A线程中通过创建一个Looper,调用loop函数,loop方法中不断去读取MessageQueue
的的Message对象,而Message
对象是其他线程创建(或其自身线程内创建的)对象,其中存储了消息数据,Message
的target
属性存储了对应的handler对象,通过调用Handler::dispatchMessage
方法去在A线程内处理其他线程告知其要进行的操作。
2. 一个Handler 的创建
val handlerThread = HandlerThread("io-thread")
handlerThread.start()
val handler = object : Handler(handler.looper) {
override fun handleMessage(msg: Message) {
//处理逻辑
}
}
val msg = Message.obtain().apply{
what = 1//要处理的消息
}
handler.sendMessage(msg)
正常情况下正确创建一个子线程的Handler 方式如上代码所示。HandlerThread
是Thread
的子类。
public class HandlerThread extends Thread {
int mPriority;
int mTid = -1;
Looper mLooper;
private @Nullable Handler mHandler;
public HandlerThread(String name) {
super(name);
mPriority = Process.THREAD_PRIORITY_DEFAULT;
}
public HandlerThread(String name, int priority) {
super(name);
mPriority = priority;
}
@Override
public void run() {
mTid = Process.myTid();
Looper.prepare();// 代码1
synchronized (this) {
mLooper = Looper.myLooper();
notifyAll();
}
Process.setThreadPriority(mPriority);//设置线程优先级
onLooperPrepared();// 默认为空的方法,可以根据需要在 loop 之前做一些必要的操作
Looper.loop();//代码2
mTid = -1;
}
public Looper getLooper() {
if (!isAlive()) {
return null;
}
boolean wasInterrupted = false;
synchronized (this) {
while (isAlive() && mLooper == null) {
try {
wait();
} catch (InterruptedException e) {
wasInterrupted = true;
}
}
}
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
return mLooper;
}
@NonNull
public Handler getThreadHandler() {
if (mHandler == null) {
mHandler = new Handler(getLooper());
}
return mHandler;
}
//... 省略
}
HandlerThread
的创建方式如上,主要是看下其run
方法的重写。其先是调用Looper.prepare();
创建Looper对象,再通过Looper.prepare
获取Looper对象并赋值给mLooper
属性,最后调用loop()
方法去执行。
3. Looper 的创建
3.1Looper.prepare
前面将了,线程通过Looper.prepare
创建了Looper对象。
class Looper {
final MessageQueue mQueue;
final Thread mThread;
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
}
如上,preapare
方法逻辑其实很简单,创建一个Looper对象并且保存在 ThreadLocal
对象中。quiteAllowed
这个值用于判断当前Looper是否可以停止。子线程创建的Looper传入的都是true,只有主线程创建时,该值为false. 因为对于一个Android app而言,显然 主线程显然是永远不能停止的。 在Looper
的构造方法中,其创建了 MessageQueue
对象,也是其Looper.loop()
方法所需要用到的对象。
MessageQueue
的构造方法如下,nativeInit()
是一个本地方法,具体后面再描述。此处我们应该知道的是MessageQueue
类如其名,就一个Message
的队列。
class MessageQueue {
private final boolean mQuitAllowed;
private long mPtr;
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
}
ThreadLocal
是为一个线程,创建一个 ***** 只属于它自己的变量副本***** ,线程可以改变自己所拥有的变量副本,而不会影响其他线程所对应的副本。ThreadLocal
具体原理比较简单,可以自行百度下。
3.2Looper.loop
在Looper.preaprea
方法中,创建了Looper对象,且Looper对象中创建了MessagqQueue
,在loop()
方法就是通过不断从MessageQueue
中读取MessageQueue
中读取Message
对象并处理了。loop()
代码如下:
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
if (me.mInLoop) {
Slog.w(TAG, "Loop again would have the queued messages be executed"
+ " before this one completed.");
}
me.mInLoop = true;// 标志其开始loop了
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
// Allow overriding a threshold with a system prop. e.g.
// adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
final int thresholdOverride =
SystemProperties.getInt("log.looper."
+ Process.myUid() + "."
+ Thread.currentThread().getName()
+ ".slow", 0);
me.mSlowDeliveryDetected = false;
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
通过loopOnce
去不断处理Message。当loopOnce
返回false时,结束loop.
loopOnce
方法如下。
private static Observer sObserver;//可通过设置该对象,来监控所有的Looper 数据
public interface Observer {
Object messageDispatchStarting();
void messageDispatched(Object token, Message msg);
void dispatchingThrewException(Object token, Message msg, Exception exception);
}
private static boolean loopOnce(final Looper me,
final long ident, final int thresholdOverride) {
//代码1
Message msg = me.mQueue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return false;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " "
+ msg.callback + ": " + msg.what);
}
// Make sure the observer won't change while processing a transaction.
final Observer observer = sObserver;
final long traceTag = me.mTraceTag;
long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
if (thresholdOverride > 0) {
slowDispatchThresholdMs = thresholdOverride;
slowDeliveryThresholdMs = thresholdOverride;
}
final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);
final boolean needStartTime = logSlowDelivery || logSlowDispatch;
final boolean needEndTime = logSlowDispatch;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
final long dispatchEnd;
Object token = null;
if (observer != null) {
token = observer.messageDispatchStarting();
}
long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
try {
//代码2
msg.target.dispatchMessage(msg);
if (observer != null) {
observer.messageDispatched(token, msg);
}
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} catch (Exception exception) {
if (observer != null) {
observer.dispatchingThrewException(token, msg, exception);
}
throw exception;
} finally {
ThreadLocalWorkSource.restore(origWorkSource);
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logSlowDelivery) {
if (me.mSlowDeliveryDetected) {
if ((dispatchStart - msg.when) <= 10) {
Slog.w(TAG, "Drained");
me.mSlowDeliveryDetected = false;
}
} else {
if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
msg)) {
// Once we write a slow delivery log, suppress until the queue drains.
me.mSlowDeliveryDetected = true;
}
}
}
if (logSlowDispatch) {
showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycleUnchecked();
return true;
}
以上是loop
方法的整个代码,为什么要列上面的代码,主要是想突出下 sObserver
这个属性对象,应用可以通过Looper::setObserver
方法传入Observer去全局监听进程中所有的Message
.
上述代码简化后
private static boolean loopOnce(final Looper me,
final long ident, final int thresholdOverride) {
//代码1
Message msg = me.mQueue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return false;
}
try {
//代码2
msg.target.dispatchMessage(msg);
if (observer != null) {
observer.messageDispatched(token, msg);
}
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} catch (Exception exception) {
throw exception;
} finally {
ThreadLocalWorkSource.restore(origWorkSource);
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
//省略
//代码3
msg.recycleUnchecked();
return true;
}
- 代码1: 通过
MessageQueue::next
方法获得Message
,next()
方法在MessageQueue
为空的时候一般是阻塞的,具体实现后面再分析 - 代码2: msg的target属性中存储了 Handler 对象,遵循了谁调用谁处理的原则,
Handler::dispatchMessage
方法中会根据msg类型来判断是 一个 调用run
方法还是调用handleMessage
来进行消息的处理。具体见后面Message
和Handler
类的分析.
3.3 Looper 全局监控Message
如 3.2节 介绍,通过 setObserver
方法监控所有Message
.
一般可以用于监控一些耗时。
4. Message
结构
public final class Message implements Parcelable {
public int what;
public int arg1;
public int arg2;
public Object obj;
public Messenger replyTo;
public static final int UID_NONE = -1;
public int sendingUid = UID_NONE;
public int workSourceUid = UID_NONE;
/*package*/ static final int FLAG_IN_USE = 1 << 0;
/** If set message is asynchronous */
/*package*/ static final int FLAG_ASYNCHRONOUS = 1 << 1;
/** Flags to clear in the copyFrom method */
/*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
int flags;
public long when;
Bundle data;
Handler target;
Runnable callback;
Message next;
public static final Object sPoolSync = new Object();
private static Message sPool;
private static int sPoolSize = 0;
private static final int MAX_POOL_SIZE = 50;
private static boolean gCheckRecycle = true;
}
以上是Message
类中的属性,重点解释下其中部分。
-
what : 一般用于在Handler::handleMessage中,判断消息类型,进行相应详细处理
-
arg1,arg2: 携带的相关参数
-
obj: 携带的对象,一般也是在
Handler::handleMessage
使用。需要注意,在Handler去进行移除MesssageQueue
中的未执行的Message
时 obj会被作为token去进行比对. -
replayTo: 用于跨进程通信。此处暂时不扩展
-
sendingUid,workSourceUid 用于跨进程通信时,记录Message发送者uid和接收者uid
-
flags: 标志Message状态,是否正在使用,是否是异步Message
-
when: Message在何时被处理.when存储的是自开机到现在的时间
-
data: 存储数据
-
target: 记录了目标Handler。在前面
Looper.loop
方法的中也讲过了,每个message最终都是交给其target记录的Handler,调用dispatchMessage 方法处理。 -
callback:用于记录Runnable对象。 Handler::post()方法中传入的
Runable
对象都是传给callback
属性,最终在Handler::dispatchMessage
中进行处理。如下方代码所示,当callback存在时,则调用handleCallback方法,最种调用到message.callback.run
.否则调用handleMessage
.mCallback
用于监听一个Handler的所有Message消息class Handler { final Callback mCallback; public void dispatchMessage(@NonNull Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } }
-
sPool: 使用链表的方式,保存被回收的Message.减少Message的创建,链表长度最长50个。其回收Message,采用头插法的方式
-
next: 用于记录下一个Message。在
MessageQueue
和Message
回收链表中用到。
以上就是Message
的介绍了。接下来就是Handler类的介绍了。
5. Handler
5.1 Handler 的创建
class Handler {
@Deprecated
public Handler() {
this(null, false);
}
@Deprecated
public Handler(@Nullable Callback callback) {
this(callback, false);
}
public Handler(@NonNull Looper looper) {
this(looper, null, false);
}
public Handler(@NonNull Looper looper, @Nullable Callback callback) {
this(looper, callback, false);
}
@UnsupportedAppUsage(maxTargetSdk = Build.VERSION_CODES.R, trackingBug = 170729553)
public Handler(boolean async) {
this(null, async);
}
//代码1
public Handler(@Nullable Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
//Handler2
@UnsupportedAppUsage
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
// 这个方法是Android9 出现的,且并未标志为 hide. google 咋突然把异步放开给开发者用了
public static Handler createAsync(@NonNull Looper looper) {
if (looper == null) throw new NullPointerException("looper must not be null");
return new Handler(looper, null, true);
}
public static Handler createAsync(@NonNull Looper looper, @NonNull Callback callback) {
if (looper == null) throw new NullPointerException("looper must not be null");
if (callback == null) throw new NullPointerException("callback must not be null");
return new Handler(looper, callback, true);
}
public interface Callback {
boolean handleMessage(@NonNull Message msg);
}
@UnsupportedAppUsage
final Looper mLooper;
final MessageQueue mQueue;
@UnsupportedAppUsage
final Callback mCallback;
final boolean mAsynchronous;
@UnsupportedAppUsage
IMessenger mMessenger;// 一般用不到
}
Handler
的构造方法共有七种。不过最终调用调用到代码1处或代码2处。
需要注意的是,开头的两个无需传入Looper
的方法都废弃了,因为 这两个方法是很不安全的。也许你创建该Handler对象时在一个没有创建Looper的子线程里面,那时候可能就会导致应用崩溃了。显示的传入让开发者明确知道其用的是哪个线程来进行消息的处理。
Handler
的构造方法中确定了 mLooper
,mQueue
,mCallback
,mAsynchronous
四个属性的值。
- mLooper: 保存该Handler使用的线程Looper对象
- mQueue: 保存该Handler使用的MessageQueue
- mCallback: 在构造时传入的对象,可以用于拦截或监听没有callback 属性的
Message
。在前面**Message
结构** 这一章介绍了该方法在handleMessage
中的作用。 - mAsynchronous:该标志位用于标志该Handler是否是异步Handler.在该标志位下,所有
Message
一旦在被该Handler处理时,其flags
标志位都会被标志位异步Message.mAsynchronous
被使用的位置在Handler::enqueueMessage
方法中。稍后会讲到该方法。
5.2 Handler发送消息的通用方法
在应用开发中在通过Handler发送消息时,常用的方法有以下。
class Handler {
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
public final boolean postAtTime(@NonNull Runnable r, long uptimeMillis) {
return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}
public final boolean postAtTime(
@NonNull Runnable r, @Nullable Object token, long uptimeMillis) {
return sendMessageAtTime(getPostMessage(r, token), uptimeMillis);
}
public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {
return sendMessageDelayed(getPostMessage(r), delayMillis);
}
/** @hide */
public final boolean postDelayed(Runnable r, int what, long delayMillis) {
return sendMessageDelayed(getPostMessage(r).setWhat(what), delayMillis);
}
public final boolean postDelayed(
@NonNull Runnable r, @Nullable Object token, long delayMillis) {
return sendMessageDelayed(getPostMessage(r, token), delayMillis);
}
public final boolean postAtFrontOfQueue(@NonNull Runnable r) {
return sendMessageAtFrontOfQueue(getPostMessage(r));
}
public final boolean sendMessage(@NonNull Message msg) {
return sendMessageDelayed(msg, 0);
}
public final boolean sendEmptyMessage(int what){
return sendEmptyMessageDelayed(what, 0);
}
public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {
Message msg = Message.obtain();
msg.what = what;
return sendMessageDelayed(msg, delayMillis);
}
public final boolean sendEmptyMessageAtTime(int what, long uptimeMillis) {
Message msg = Message.obtain();
msg.what = what;
return sendMessageAtTime(msg, uptimeMillis);
}
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
//在指定时间处理数据
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
//让消息放入当前消息队列的头部,让其最先被执行
public final boolean sendMessageAtFrontOfQueue(@NonNull Message msg) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, 0);
}
public final boolean executeOrSendMessage(@NonNull Message msg) {
if (mLooper == Looper.myLooper()) {
dispatchMessage(msg);
return true;
}
return sendMessage(msg);
}
//消息的处理,在前面也讲过了
public void dispatchMessage(@NonNull Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
}
如上所示的一些方法。其中post的方法传入的Ruuable对象,都是创建一个 Message对象,赋值给callback属性。而sendMessage* 调用的是 sendMessageAtTime
方法,而sendMessageAtTime
则调用了 enqueueMessage
方法。所以重点还是enqueueMessage
方法的实现。
- sendMessageAtTime: 指定Message 被执行的最早时间,
uptimeMillis
参数最终是被赋值到Message
的when
属性中。 - sendMessageAtFrontOfQueue: 目的是让Message 去尽快的处理,通过将
Message
的when
值设置为0,则其优先级则是最高的,将在MessageQueue队列的头部。 - executeOrSendMessage: 根据判断,如果当前调用时所在线程与handler处理的线程是同一个,则不需要通过
MessageQueue
,直接调用dispatchMessage
方法处理。
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
Handler 的enqueueMessage
方法如上所示,将Message.target
属性指向自身。根据前面讲过的mAsynchronous
属性来判断是否将所有的Message
都设置为异步消息。 关于异步消息的处理将在后面MessageQueue
的章节中介绍。
5.3 移除与查找Message
很多时候我们需要移除之前的Message
。比如当拉取数据后,要通知页面进行刷新,可能在此之前因为别的原因也发送了一个刷新页面的Message
,防止页面多次重复刷新,需要将之前的Message
移除。又或者在Activity 结束时,防止其内部Handler对象被线程持有导致Activity内存泄露,去移除所有Message
.
//移除所有Message.callback == r 且msg.target == this的对象
public final void removeCallbacks(@NonNull Runnable r) {
mQueue.removeMessages(this, r, null);
}
public final void removeCallbacks(@NonNull Runnable r, @Nullable Object token) {
mQueue.removeMessages(this, r, token);
}
//移除所有Message.waht == what的且msg.target == this的对象
public final void removeMessages(int what) {
mQueue.removeMessages(this, what, null);
}
public final void removeMessages(int what, @Nullable Object object) {
mQueue.removeMessages(this, what, object);
}
// 与上方不同,此处的Message.obj 与 object的判等是使用 equal方法判等对象是否相等。上面则是使用 == 判等对象是否同一个。
public final void removeEqualMessages(int what, @Nullable Object object) {
mQueue.removeEqualMessages(this, what, object);
}
// 根据Message.obj对象去匹配,移除数据。toeken为 null时,移除所有target为当前Handler的Message
public final void removeCallbacksAndMessages(@Nullable Object token) {
mQueue.removeCallbacksAndMessages(this, token);
}
public final boolean hasMessages(int what) {
return mQueue.hasMessages(this, what, null);
}
public final boolean hasMessagesOrCallbacks() {
return mQueue.hasMessages(this);
}
public final boolean hasMessages(int what, @Nullable Object object) {
return mQueue.hasMessages(this, what, object);
}
public final boolean hasEqualMessages(int what, @Nullable Object object) {
return mQueue.hasEqualMessages(this, what, object);
}
public final boolean hasCallbacks(@NonNull Runnable r) {
return mQueue.hasMessages(this, r, null);
}
这些方法不一一列举介绍了,对应的方法作业也在代码中注释了。
此处展示下 MessageQueue.removeCallbacksAndMessages
方法。其他的remove*方法也只是while 和 if的判等增删了一些what等的条件判断。下方的删除操作,则是一个经典的链表删除操作,采用了两步骤,先判断头部数据是否符合删除条件的,此时只用考虑当前访问结点,不断回收节点,移动mMessages
指向的对象,直到出现不合条件的。后面则不断查找p指向的next结点,判断其是否符合条件,符合的删除。
class MessageQueue {
//Message 链表的头部
Message mMessages;
void removeCallbacksAndMessages(Handler h, Object object) {
if (h == null) {
return;
}
// 对Message操作时,增删查都是需要使用 synchronized 锁住
synchronized (this) {
Message p = mMessages;
// Remove all messages at front.
while (p != null && p.target == h
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}
// Remove all messages after front.
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
}
5.4 Handler的一些不常用方法
除了post,send相关方法以外,Handler还有一些特殊用法.
5.4.1 runWithScissors
方法
该方法是一个Hide 方法,不建议应用层使用。该方法的作用是 保证Runanbl
中的代码在指定的线程中执行,在执行完传入的Ruunable
操作之前,保持当前调用者的线程堵塞。timeout 指定最长等待时间,超过指定时间后,不等待,值不大于0时一直等待。BlockingRunable
实现看代码吧。
public final boolean runWithScissors(@NonNull Runnable r, long timeout) {
if (r == null) {
throw new IllegalArgumentException("runnable must not be null");
}
if (timeout < 0) {
throw new IllegalArgumentException("timeout must be non-negative");
}
// 如果在当前线程,则直接执行run方法
if (Looper.myLooper() == mLooper) {
r.run();
return true;
}
//使用 BlockingRunnable,保证在r 执行结束前始终保持阻塞。
BlockingRunnable br = new BlockingRunnable(r);
return br.postAndWait(this, timeout);
}
private static final class BlockingRunnable implements Runnable {
private final Runnable mTask;
private boolean mDone;
public BlockingRunnable(Runnable task) {
mTask = task;
}
@Override
public void run() {
try {
mTask.run();
} finally {
synchronized (this) {
mDone = true;
notifyAll();
}
}
}
public boolean postAndWait(Handler handler, long timeout) {
// post 失败时返回false,如果出现这种情况,一般是Handler所在线程结束了。
if (!handler.post(this)) {
return false;
}
synchronized (this) {
if (timeout > 0) {
final long expirationTime = SystemClock.uptimeMillis() + timeout;
while (!mDone) {
long delay = expirationTime - SystemClock.uptimeMillis();
if (delay <= 0) {
return false; // timeout
}
try {
wait(delay);
} catch (InterruptedException ex) {
}
}
} else {
while (!mDone) {
try {
wait();
} catch (InterruptedException ex) {
}
}
}
}
return true;
}
}
在framework 中 ,WindowManagerService
的服务的初始化就是调用使用该方法的。
class WindowManagerService {
// 省略其他代码
private static WindowManagerService sInstance;
public static WindowManagerService main(final Context context, final InputManagerService im,
final boolean haveInputMethods, final boolean showBootMsgs, final boolean onlyCore,
WindowManagerPolicy policy) {
DisplayThread.getHandler().runWithScissors(() ->
sInstance = new WindowManagerService(context, im, haveInputMethods, showBootMsgs,
onlyCore, policy), 0);
return sInstance;
}
}
6. MessageQueue
6.1 MessageQueue
的创建
MessageQueue
类在Looper
的构造方法中被创建。
class Looper {
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
}
再来看下MessageQueue.java
构造方法
public final class MessageQueue {
private final boolean mQuitAllowed;//
private long mPtr; //
Message mMessages;// 指向Message链表的头结点
private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
/*IdeleHandler列表*/
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private IdleHandler[] mPendingIdleHandlers;
private boolean mQuitting;
private boolean mBlocked;
// The next barrier token.
// Barriers are indicated by messages with a null target whose arg1 field carries the token.
@UnsupportedAppUsage
private int mNextBarrierToken;
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
public static interface IdleHandler {
boolean queueIdle();
}
}
- mQuitAllowed:判断是否允许MessageQueue退出,该参数是在Looper创建时传入的
- mPtr: 其指向的是c++ 层的MessageQueue类对象。后面要处理native 层的Message时使用指针。(在 native层也是有消息队列的)
- mIdleHandlers: 要处理的
IdleHandlers
列表。IdeleHandler
一般用于处理低延时性的任务,其在检测到消息队列中没有需要立即处理的消息时,才会去处理IdealeHandler
。 - mPendingIdleHandlers: 用于存储即将要被执行的IdleHandlers.
- mQuitting: 标志当前消息队列状态,是否已停止
- mBlocked: 在每次next时记录当前是否需要阻塞等待。
- mNextBarrierToken: 用于记录消息屏障的token,每次都是递增的值,mNextBarrierToken++后的值保存在
msg.arg1
中.后面根据token值移除消息屏障.
6.2 同步消息屏障
6.2.1 作用与原理
-
异步消息的创建:
- 通过调用
Message::setAsynchronous()
设置为异步Message
- 通过异步
Handler
设置,异步Handler
中所有消息都被设置为异步Message
- 通过调用
-
同步消息屏障作用:通过消息屏障,让同步消息不被执行,只执行异步消息。通过这种方式,能让一些需要被及时处理的一组异步Message被及时执行。
-
实现原理: 在
MessageQueue
的某个位置放一个 target 属性为 null 的 Message,确保此后的非异步 Message 无法执行,只能执行异步 Message。 -
应用:
View重绘:
ViewImpl::scheduleTraversals
中使用同步屏障,确保View重绘不受到队列中消息影响。屏幕刷新 :
Choreographer
就使用到了同步屏障,确保屏幕刷新事件不会因为队列负荷影响屏幕及时刷新。
6.2.2 使用
消息屏障的设置
int token = mHandler.getLooper().getQueue().postSyncBarrier();
异步消息发送
msg = Message.obtain();
msg.what = {目标值};
msg.setAsynchronous(true);
mHandler.sendMessage(msg);
消息屏障的移除
mHandler.getLooper().getQueue().removeSyncBarrier(token);
注意: 同步屏障的
postSyncBarrier()
方法是不公开方法,只能通过反射去调用。一般情况下,也不建议使用。就算是android系统中,也基本没几处使用过。
6.2.3 实现代码
@UnsupportedAppUsage
@TestApi
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
// 根据时间,插入到指定位置
private int postSyncBarrier(long when) {
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
@UnsupportedAppUsage
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
6.3 next()
与 enqueueMessage
方法的实现
在MessageQueue.java
中next
方法与 enqueueMessage
方法的实现是核心
6.3.1 next
方法
@UnsupportedAppUsage
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//代码1 该方法是一个本地方法,其内部去处理native的Message以及根据传入的nextPollTimeoutMillis决定当前底层应该阻塞的时间
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 代码 2 遇到同步屏障,只查询异步消息
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
//代码3 存在消息,但当前不是该Message执行的时间点,计算到Message执行的时间差
if (now < msg.when) {
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {// 代码4 存在需要立即处理的消息
mBlocked = false;
if (prevMsg != null) {// prevMgs 不为空的情况只出现同步屏障情况下,此时msg指向的异步Message
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
//代码5 将消息状态标志为正在使用,return 返回给Looper
msg.markInUse();
return msg;
}
} else {
//代码6 没有消息
nextPollTimeoutMillis = -1;
}
//代码7 如果当前已被标志为停止,则结束循环,dispose中释放所有Message,返回null去告知Looper结束loop循环。
if (mQuitting) {
dispose();
return null;
}
// 代码8 当前没有消息需要处理时,且是第一次进入IdleHandlerder的size判断时
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
// 代码9 当既没有Message需要处理,又没有IdleHandler需要处理时,将 mBlocked设置为true,告知当前Handler线程是被阻塞的。通过continue 结束本次,进入下次循环,执行到代码1处的 nativePollOnce 去让线程挂起一段时间.
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
//代码10 将 mPendingIdleHandlers 中数据拷贝到 mPendingIdleHandlers 数组中,这种方法的目的是为了更方便的移除`mIdleHandlers`数组链表中的数据
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}//synchronized 代码块底部
// 代码11 处理Handler.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // 此处是为了保证没有强引用执向对象
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
//代码12 当前IdelHandler的 queueIdle方法返回true,表示该方法不需要再被执行了,移除。
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
//代码13 清空阻塞时间。能执行到这一步时,说明是有IdelHandler处理的。否则在 代码9 处已经continue了。
nextPollTimeoutMillis = 0;
}//第一层 for 循环
}
next方法的具体实现机制,在上述通过代码注意差不多讲完了。
整理逻辑就是:
(1)遇到同步阻塞的Message时候,只查找异步消息。否则就去尝试处理mMessages
执向的头结点
(2) 当有消息需要在当前时间被处理时,返回消息给Looper. 当有消息,但是当前没有到达执行时间点时就计算出nextPollTimeoutMillis
,该值表示需要阻塞的时间;当mMessage
为空时,则nextPollTimeoutMillis = -1
,即永远等待。
(3) 当前无消息可处理时,去查询有无IdleHandle
要处理,有的话,就去处理IdleHandle
,且将nextPollTimeoutMillis=0
,即不阻塞,重新再次进入 消息的查找,即(1)。 如果无IdleHandle
,则会在 nativePollOnce
中被阻塞。
nativePollOnce
值为 0时不等待;大于1时,等待相应毫秒时间; -1时,用于等待。
6.3.2 enqueueMessage
方法
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
// 放入同一消息被重复放入
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
· //代码1当消息队列已停止,消息释放
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
//代码2 标志消息已被使用
msg.markInUse();
msg.when = when;//执行时间
Message p = mMessages;
boolean needWake;//用于判断当前是否需要唤醒当前线程
if (p == null || when == 0 || when < p.when) {
//代码3 当消息队列为空,或这个消息执行时间比当前消息晚时,将当前消息放入首位
// 如果当前Handler对应线程处于阻塞状态,则应该去唤醒
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {//代码4 消息插入位置为非头结点
//线程处于阻塞状态且属于 同步屏障状态,且压入的消息属于异步消息时,需要唤醒Handler线程
//为其他状态不用唤醒?消息不为异步消息时,且该消息又不是第一个消息时。第一个消息都还没达到执行状态,自然不用唤醒
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
//代码5 根据时间比对,插入消息对应位置
if (p == null || when < p.when) {
break;
}
// 代码6 当插入的消息不是第一个异步消息时,不需要唤醒
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {//代码7 唤醒阻塞的Handler线程
nativeWake(mPtr);
}
}
return true;
}
其整体逻辑如下:
(1) 检测Message 已经被标志为已使用,防止重复压入同一Message
(2) 检测 消息队列已被外部调用停止了,停止了则压入失败
(3)据Message
执行时间压入 对应位置。且根据Message
压入的位置来决定是否要执行唤醒Handler线程。(使用nativeWake
方法,该方法后面介绍)
* 当新消息放入头部时,根据当前`mBlocked`判断是否要唤醒,在阻塞则唤醒
* 当新消息放入位置不在头部时,当且仅当 `mBlocked`为true,正处于同步屏障状态,压入的新消息是异步消息,且该异步消息在当前消息队列的异步消息中排在首位时,需要执行唤醒Handler线程操作。 其他状态都不需要唤醒线程。
7. 一些问题回答
7.1 为什么Looper.loop
不会卡死
因为 Looper.loop
会不断去读取Message
。当前没有消息处理时,其会进行等待。
其等待有两种:
(1) 无限等待
此消息队列中没有消息或 正处于同步消息屏障但没有异步消息,此时通过nativePollOnce
,最终调用epoll_wait(-1)
进行无限等待
(2) 有限等待
消息队列存在待处理消息,但还未达到执行时间,则计算到其执行时间应该等待的时长,epoll_wait
函数传入对应时长,等待相应时间
MessageQueue
何时被唤醒?
(1) 在有限等待状态,达到指定时间
(2) 不管是有限等待还是无限状态,当有新消息压入,根据消息属性决定是否唤醒。可看下 6.3.2 enqueueMessage
方法 处介绍。
7.2 MessageQeueue
如何保证并发的线程安全?
任何线程都可以通过 Handler 生产 Message 并放入 MessageQueue 中,可 Queue 所属的 Looper 在持续地读取并尝试消费 Message。如何保证两者不产生死锁?
Looper 在消费 Message 之前要先拿到 MessageQueue 的锁,只不过没有 Message 或 Message 尚未满足条件的进行等待前会事先释放锁, 具体在于 nativePollOnce() 的调用在 synchronized 方法块的外侧。
Message 入队前也需先拿到 MessageQueue 的锁,而这时 Looper 线程正在等待且不持有锁,可以确保 Message 的成功入队。入队后执行唤醒后释放锁,Native 收到 event 写入后恢复 MessagQueue 的读取并可以拿到锁,成功出队。
这样一种在没有 Message 可以消费时执行等待同时不占着锁的机制,避免了生产和消费的死锁。
7.3 为什么建议通过Message.obtain()
获取Message
Message
中采用了享元设计模式,如此能避免频繁创建Message
对象,更加高效。
Message
的享元池最大数量为50,且是当前进程所共用的。
结尾
暂时就这么多吧,后面再补充了。文章中MessageQueue
使用nativePollOnce
,nativeWake
来进行挂起线程与唤醒线程操作,之所以不使用 wait
,notify
等方法,是因为 Handler
机制不止是 java
层使用,native
层也使用到了该机制。
下一篇文章可能讲下native 中MessageQueue
、Looper
、Handler
的使用与原理吧。 或者是ThreadLocal
吧。
转载自:https://juejin.cn/post/7174771611294236730