EventBus解析
一、概述
事件通信(消息通信)是APP开发过程中常见的场景。 各级视图之间,视图和模型之间,再到模块与模块之间,都需要消息通信。
若以现实中的通信类比,距离不同,可选的通信策略也不同:
- 面对面:普通讲话;
- 距离稍远:大声呼喊;
- 几公里内:对讲机;
- 更远距离:电信网络,卫星电话,互联网等。
在软件开发中,组件的耦合程度,可类比于现实通信的距离。 简而言之,就是要找到媒介,方能建立通信。 如果组件之间有直接的引用关系,通过方法调用,变量传递,接口回调等方式即可; 否则,则需要通过系统组件或者“全局变量”来通信,其中“全局变量”又有作用域之分。
除了通信距离之外,还有其他方面。
- 通信模式:一对一、一对多、多对一、多对多。
- 通信时效性:
- 直接对话:对话时实时传递,对方会立即响应;
- 发消息/邮件:消息会保留记录,对方可以不需要立即处理。
若只考虑APP内部的通信,能最大程度兼容通信距离、通信模式和通信模式通信时效等各个维度的,当属事件总线(EventBus)。
EventBus也有多种实现,本篇重点分析一下 greenrobot 的 EventBus ,同时也简单分析一下其他事件总线。
二、EventBus简介
EventBus是适用于Android和Java的发布/订阅事件总线。
EventBus is a publish/subscribe event bus for Android and Java.
EventBus 的 Github 主页比较简单,主要包含EventBus描述,基本用法,引入方式等。 官网中有更详细的介绍,包含各种用法和注意要点。
这里我们简单回顾一下EventBus的用法。
2.1 事件定义
EventBus 的事件定义是通过定义 class 来实现的,用class类型来标识事件,用类字段来装载数据。
public static class MessageEvent { /* Additional fields if needed */ }
这样定义,好处如下:
- 发送事件和数据合二为一,事件只需传一个对象;
- 用类字段装载数据,相比于传Map, 可变参数,Object等,数据定义更加清晰。
不过,这样定义事件的前提,是发布者和订阅者都能引用到事件类。 通常情况下,都能找到耦合点存放事件类; 如果找不到,可以用String, byte[]这样的万能类型作为事件类型。
2.2 声明订阅方法
只有声明了订阅方法的类,才能成为“订阅者”:没有订阅方法的类,注册时会抛异常。
声明订阅方法的规则如下:
- 方法需是 public,非static, 非abstract, 参数只有一个;
- 添加
@Subscribe
注解。
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {
// Do something
}
@Subscribe
注解有3层作用,
- 如果不使用注解处理器,则在运行时反射检索被
@Subscribe
注解的方法。 - 如果引入注解处理器,编译时会检索被
@Subscribe
注解且符合条件的方法,生成“订阅索引”。 - 通过注解参数,可以设定订阅方法的处理行为。
注解可选参数有:
参数 | 类型 | 含义 |
---|---|---|
threadMode | ThreadMode | 线程模式:指定通过何种执行方式调用订阅方法。 |
sticky | boolean | 粘性事件:当为true时,在注册方法可接收粘性事件(如果存在的话)。 |
priority | int | 优先级:事件可能对应多个订阅者,发布时按优先级和订阅时序的顺序发布。 |
这里我们先不展开,在后面实现分析部分再具体细说。
2.3 注册/反注册
EventBus.getDefault().register(subscriber);
EventBus.getDefault().unregister(subscriber);
注册和反注册通常和订阅者的生命周期相关:
可以在onCreate/onDestroy
中定义,
也可以在onStart/onStop
中定义,
或者如果订阅者生命周期和Application等同,也可以不用反注册。
2.4 发送事件
发送事件很简单,创建一个事件对象,传给post方法即可。
EventBus.getDefault().post(new MessageEvent());
2.5 订阅索引
订阅索引是 EventBus 3 引入的特性,其目的是加速注册订阅(register)。 关于订阅索引的使用,可参考EventBus官网:subscriber-index
简单来说,创建订阅索引,需要执行如下步骤:
-
引入注解处理器
apply plugin: 'kotlin-kapt' dependencies { def eventbus_version = '3.2.0' implementation "org.greenrobot:eventbus:$eventbus_version" kapt "org.greenrobot:eventbus-annotation-processor:$eventbus_version" } kapt { arguments { // 传入索引类的全路径名,当前模块添加了 @Subscribe 注解的方法会集成到此类中。 arg('eventBusIndex', 'com.example.myapp.MyEventBusIndex') } }
添加注解处理器后,编译时会检索被
@Subscribe
注解且符合条件的方法,生成“订阅索引”(一个包含了订阅者和订阅方法信息的类)。 -
在APP启动时,将注解处理器生成的索引类添加到 EventBus 中。
其中,第2步又分两种情况:
- 自行构建 EventBus 实例:
EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
- 注入默认EventBus实例
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus(); // Now the default instance uses the given index. Use it like this: EventBus eventBus = EventBus.getDefault();
EventBus eventBus = EventBus.builder()
.addIndex(new MyEventBusAppIndex())
.addIndex(new MyEventBusLibIndex())
.build();
添加索引不是必需的,如果不加索引,EventBus会通过反射检索订阅者的方法。
三、EventBus实现分析
3.1 基本要素
事件的订阅和发布,总体上有以下要素:
- 角色:发布者 (Publisher)、订阅者(Subscriber)
- 行为:订阅(register)、取消订阅(unregister)、 发布(post)
发布事件时,发布者的概念被弱化,事件(Event) 的概念被强化。
例如,登录账号后,发布“登录”事件:
EventBus.getDefault().post(new LoginEvent());
具体开发中,我们可以认为调用了post方法的主体是发布者(可以是类,或者模块)。 但在上面这行代码中,我们看到了工具(EventBus)、动作(post)和事件(LoginEvent),而没有看到发布者。
EventBus的源码中,用名为 Subscription
的类来描述一组订阅。
Subscription
中包含了订阅者(subscriber) ,订阅者所关注的事件(event) ,事件对应的方法(Method) ,以及其他参数。
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
}
public class SubscriberMethod {
final Method method;
final Class<?> eventType;
final ThreadMode threadMode;
final boolean sticky;
final int priority
}
EventBus的订阅者和事件是多对多的关系:
- 一个订阅者可能关注多个事件 ;
- 一个事件可能被多个订阅者所关注。
EventBus允许(但不建议)在同一个类(订阅者)中注解两个同样参数(事件)的方法。
@Subscribe(threadMode = ThreadMode.MAIN)
public void methodB(Event2 event) {
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void methodC(Event2 event) {
}
EventBus 用两个 Map 来记录订阅关系(订阅者,事件,方法):
public class EventBus {
// 事件 -> 订阅方法列表,用于发布事件时检索订阅方法并执行
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 订阅者 -> 关注的事件,用于取消订阅时索引订阅者所关注的事件
private final Map<Object, List<Class<?>>> typesBySubscriber;
}
基于此,我们接下来简单分析一下EventBus订阅/发布的实现。
3.2 订阅/发布
3.2.1 基础架构
EventBus发布订阅的架构图如下。
关注点:
- 三个方法:
register
、unregister
、post
- 两个容器:
subscriptionsByEventType
、typesBySubscriber
精简后的源码如下:
public class EventBus {
private final SubscriberMethodFinder subscriberMethodFinder;
// 事件 -> 订阅方法列表
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 订阅者 -> 关注的事件
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 注册订阅
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 检索订阅者的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
// 记录订阅关系
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
}
subscriptions.add(newSubscription);
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
}
// 取消订阅
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
}
}
// 移除订阅关系
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
// 同一个事件可能对应订阅者的多个方法,所以这里匹配后没有立即break
if (subscription.subscriber == subscriber) {
subscriptions.remove(i);
i--;
size--;
}
}
// 这里可以优化(个人观点):
// 当 subscriptions.isEmpty()时,可执行 subscriptionsByEventType.remove(eventType),
// 以免 subscriptionsByEventType 堆积空List,降低检索效率
}
}
// 发布事件
public void post(Object event) {
postSingleEvent(event, postingState);
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
// eventInheritance(事件继承) 默认是true, 可通过builder设置
if (eventInheritance) {
// 遍历子类以及父类和接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
postSingleEventForEventType(event, postingState, clazz);
}
} else {
postSingleEventForEventType(event, postingState, eventClass);
}
}
private boolean postSingleEventForEventType(Object event,
PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
// 如果有关联当前event的方法,则发送事件
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postToSubscription(subscription, event, postingState.isMainThread);
}
return true;
}
return false;
}
// 执行发送
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 根据线程模型,使用不同的发布策略(这个后面再分析)
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
// 其他模式 ...
}
}
void invokeSubscriber(Subscription subscription, Object event) {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
}
}
EventBus的主体功能包含三个函数:
-
订阅 (regiester)
- 检索记录订阅者的方法,取其中添加了
@Subscribe
注解的方法; - 取方法参数的类型得到
eventType
, 取注解参数得到threadMode
,sticky
等参数; - 将“订阅者,事件,方法,以及其他参数”记录到
subscriptionsByEventType
和typesBySubscriber
两个Map中。
- 检索记录订阅者的方法,取其中添加了
-
取消订阅(unregister)
- 索引订阅者关注的事件列表,遍历事件,移除
subscriptionsByEventType
中关于此事件的订阅方法; - 从
typesBySubscriber
中移除订阅者以及所关联的事件列表。
- 索引订阅者关注的事件列表,遍历事件,移除
-
发布(post)
- 在
subscriptionsByEventType
中索引Event
所关联的Subscription
列表; - 遍历
Subscription
列表,执行其方法。 - 默认情况下,启用事件继承(eventInheritance) 。
- 在
3.2.2 事件继承
所谓“事件继承”,是指 :
如果方法订阅的事件类型是父类(或者接口),发布的事件类型是子类(或者实现),则方法能够收到该事件。
EventBus实现这个,是基于面向对象的继承和多态规则,即:允许将 子类类型的引用 赋值给 父类类型的引用。
举个例子:
假如有这样一个订阅方法,其订阅的类型是CharSequence
。
@Subscribe
fun test(param: CharSequence){
}
由于String
实现了CharSequence
,所以当有String
类型的事件发送时,此方法会收到事件。
其实现原理如下:
public class EventBus {
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
// eventInheritance(事件继承) 默认是true, 可通过builder设置
if (eventInheritance) {
// lookupAllEventTypes 返回类型本身,以及父类和接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
// 遍历类型列表
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
postSingleEventForEventType(event, postingState, clazz);
}
} else {
postSingleEventForEventType(event, postingState, eventClass);
}
}
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
// 循环查找,直到根类(Object)为止
while (clazz != null) {
eventTypes.add(clazz);
// 如果类实现了接口,则添加接口
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
// 递归查找接口的父类
// 即:如果接口有继承关系,则连同接口的父类一并添加
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
}
例如,发送一个String
类型事件,可以得到如下事件类型列表:
这个例子中,只有类型继承的关系,没有接口的继承关系。
按照源码的实现,如果接口有父接口,则会一并添加到列表中。
eventInheritance
可在构建 EventBus 实例时指定,默认为true。
大家常用的默认实例(EventBus.getDefault
)的eventInheritance
就是默认情况下就是true。
// EventBus.java
public class EventBus {
private final boolean eventInheritance;
EventBus(EventBusBuilder builder) {
eventInheritance = builder.eventInheritance;
}
}
// EventBusBuilder.java
public class EventBusBuilder {
boolean eventInheritance = true;
public EventBusBuilder eventInheritance(boolean eventInheritance) {
this.eventInheritance = eventInheritance;
return this;
}
}
EventBus之所以默认设置为true, 大概是因为:如果不默认设置为true,估计很多人都不会注意到这个特性。
但大多数情况下,其实发送者是有明确的意图的,发送者只想发送确定的类型给对应订阅者。
例如,登录模块会定义类似LoginEvent
之类的类型,
其发送事件时,只期望订阅了LoginEvent
类型的订阅者接收,而不期望被关注 Object
类型的订阅者接收。
这里我有个想法:
将eventInheritance
从全局变量改为post
方法的参数,则“两难自解”。
同时,通过方法重载,不传eventInheritance
参数的post
方法,默认为false;
如果明确想要订阅父类类型的方法能接收到事件,则调用post(event, true)
。
// 备注:以下代码并非源码,仅为个人想法
public void post(Object event) {
post(event, false);
}
public void post(Object event, boolean eventInheritance) {
postSingleEvent(event, postingState, eventInheritance);
}
这个小节的最后,关于事件的定义,有以下建议:
- 发送的事件为简单的数据类型,不要有太多的继承和实现(效率层面);
- 尽量用自定义的类型,尽量不要用
String
之类等“万能类型”,以确保清晰的订阅关系(维护层面)。
关于第2点的说明:
使用自定事件类型,通过检索事件类型的引用(Find Usages), 可以看到谁是订阅者,谁是发送者;
而使用String
类型,则因为String
类型有太多的引用,而难以找到订阅关系。
3.2 查找方法
3.3.1 查找流程
上一节我们看到,在注册订阅时,需要查找订阅者的订阅方法:
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
}
查找方法的大概过程如下:
查找订阅方法的源码如下。
可关注三个方法:
findSubscriberMethods
:查找入口findUsingInfo
:优先用索引查找findUsingReflection
:直接反射查找
public class SubscriberMethodFinder {
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
// 订阅信息索引
private List<SubscriberInfoIndex> subscriberInfoIndexes;
private final boolean ignoreGeneratedIndex;
// subscriberInfoIndexes 由 EventBusBuilder 传入;
// 如果没有设定,则 subscriberInfoIndexes 为空。
SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean ignoreGeneratedIndex) {
this.subscriberInfoIndexes = subscriberInfoIndexes;
this.ignoreGeneratedIndex = ignoreGeneratedIndex;
}
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 查找方法有两条路径:用反射查找,用索引查找
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
// 用索引信息查找方法列表
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
// 从子类到根父类,递归查找
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
// 如果有订阅方法信息(subscriberInfo),则通过 subscriberInfo 获取方法列表
// 如果没有,则还是通过反射查找
if (findState.subscriberInfo != null) {
// getSubscriberMethods 方法后面会分析,这里可以先跳过
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
// 获取 findState 中搜集的方法,并回收findState(用了池化技术)
return getMethodsAndRelease(findState);
}
// 从索引中获取订阅信息
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
// 用反射查找方法列表
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
// 从子类到根父类,递归查找
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
if (!findState.skipSuperClasses) {
findState.moveToSuperclass();
}
}
return getMethodsAndRelease(findState);
}
private void findUsingReflectionInSingleClass(FindState findState) {
// 通过反射获取方法列表,暂存到 findState 中
// 这部分先不分析,留到 3.2.3 小节 }
}
}
概括来讲,查找订阅方法的过程,有两条路径:
- 反射查找:通过
clazz.getDeclaredMethods()
获取类的方法列表,过滤其中符合条件的方法,并封装SubscriberMethod
; - 订阅索引:通过 EventBusBuilder 传入的
subscriberInfoIndexes
的指引下构建SubscriberMethod
。
3.2.2 订阅者继承
无论是走反射还是索引,都要遍历当前类型到根类型(Object),解析各层级的方法列表。
while (findState.clazz != null) {
// 查找当前类方法.....
// 切换到父类
findState.moveToSuperclass();
}
之所以要解析订阅者的父类,是因为: 如果一个类的父类也声明了订阅方法,那注册这个订阅者时,也需要找到其父类的订阅方法。 例如:
open class SubscriberX {
@Subscribe()
fun functionX(event: A) {
Log.d("Tag", "functionX event:${event::class.simpleName}")
}
}
class SubscriberY : SubscriberX(){
@Subscribe()
fun functionY(event: B) {
Log.d("Tag", "functionY event:${event::class.simpleName}")
}
}
fun test() {
val subscriberY = SubscriberY()
EventBus.getDefault().register(subscriberY)
EventBus.getDefault().post(A())
EventBus.getDefault().post(B())
EventBus.getDefault().unregister(subscriberY)
}
// 输出:
// functionX event:A
// functionY event:B
订阅者的继承比事件的继承更简单一些: 事件的继承需要迭代查找父类,并且获取每一层类型所实现的接口,以及接口的父接口。 订阅者的继承只需要迭代查找父类, 因为接口中不能实现方法,不会存在订阅方法。
另外,为了加速查找,会过滤一些全限定名以java
,javax
,android
,androidx
等起头的类型。
void moveToSuperclass() {
if (skipSuperClasses) {
clazz = null;
} else {
clazz = clazz.getSuperclass();
String clazzName = clazz.getName();
// Skip system classes, this degrades performance.
// Also we might avoid some ClassNotFoundException (see FAQ for background).
if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") ||
clazzName.startsWith("android.") || clazzName.startsWith("androidx.")) {
clazz = null;
}
}
}
3.2.3 反射查找
反射查找的源码如下:
public class SubscriberMethodFinder {
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods,
// especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// 获取当前类的所有方法,包括父类声明的方法
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
// 遍历方法列表,查找其中满足条件的订阅方法。
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 &&
(modifiers & (Modifier.ABSTRACT | Modifier.STATIC)) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 取出注解携带的参数,连同 method 和 eventType, 封装到SubscriberMethod
findState.subscriberMethods.add(
new SubscriberMethod(
method,
eventType,
subscribeAnnotation.threadMode(),
subscribeAnnotation.priority(),
subscribeAnnotation.sticky()
)
);
}
}
}
}
}
}
看起来代码不少,但逻辑比较简单。 就是获取类声明的方法,并挑选满足其中条件:
- public,非static, 非abstract, 参数只有一个;
- 声明了
@Subscribe
注解。
但看起来逻辑简单的操作,有时候会有较大的消耗。 比如下面这段代码:
class MainActivity : AppCompatActivity() {
@Subscribe
fun test(param: CharSequence){
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
enableEdgeToEdge()
setContentView(R.layout.activity_main)
ViewCompat.setOnApplyWindowInsetsListener(findViewById(R.id.main)) { v, insets ->
val systemBars = insets.getInsets(WindowInsetsCompat.Type.systemBars())
v.setPadding(systemBars.left, systemBars.top, systemBars.right, systemBars.bottom)
insets
}
findViewById<Button>(R.id.testBtn).setOnClickListener {
EventBus.getDefault().post("hello")
}
EventBus.getDefault().register(this)
}
override fun onDestroy() {
super.onDestroy()
EventBus.getDefault().unregister(this)
}
}
看起来只声明的三个方法,事实上其方法不止3三个:
类中使用的一些的lambda方法,也会被 getDeclaredMethods
检索出来。
源码中关于 getDeclaredMethods
的注释是:
This is faster than getMethods, especially when subscribers are fat classes like Activities
重点在于 fat classes like Activities。
这个例子中,MainActivity
所继承的AppCompatActivity
,
其全限定名为:androidx.appcompat.app.AppCompatActivity
,正好是androidx
包下的类,所以不需要继续迭代查找父类。
实际开发中,Activity/Fragment通常自定义了多层继承,每一层又可能有多个方法。
故此,不建议用 Activity/Fragment 作为订阅者,因为这种对象类继承层级较多,方法也多(EventBus源码中称之为"fat classes") , 无论是注解处理器的编译时分析,还是反射的运行时遍历,代价都很高。 可以通过创建只包含订阅方法的类来作为订阅者。
例如:
class MainActivity : AppCompatActivity() {
inner class LoginEventHandler {
@Subscribe(threadMode = ThreadMode.MAIN)
fun onLogin(uid: Long) {
}
@Subscribe(threadMode = ThreadMode.MAIN)
fun onLogout(uid: Long){
}
}
private val loginEventHandler = LoginEventHandler()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
EventBus.getDefault().register(loginEventHandler)
}
override fun onDestroy() {
super.onDestroy()
EventBus.getDefault().unregister(loginEventHandler)
}
}
如此,订阅者不会过深的继承层级,也没有庞大的方法列表,相比直接用Activity/Fragment作为订阅者,必然是要快许多的。
3.3.4 订阅索引
曾几何时,EventBus查找订阅方法是对比方法名来实现的。
方法名为onEvent
、onEventXXX
, XXX为ThreadMode
。
后来订阅方法的定义换成了用注解声明,并通过注解传入threadMode,sticky,priority等参数。 再后来,引入了订阅索引(subscriber index) ,用以加速订阅方法的查找。
网上有种说法: “EventBus 3 可以在编译时获取信息,不需要在运行时反射”。
这种说法并不准确:
- 如果不用注解处理器&添加订阅索引,还是会通过反射查找方法;
- 即使添加了订阅索引,也只是不需要通过反射遍历订阅者的方法,创建订阅方法需要反射,发送事件时调用方法也需要反射。
例如,通过订阅索引创建订阅方法的代码如下:
public class SubscriberMethodInfo {
final String methodName;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
}
public class SimpleSubscriberInfo implements SubscriberInfo {
private final Class subscriberClass;
// 注解处理器生成的索引类中,会创建 SimpleSubscriberInfo 对象
public SimpleSubscriberInfo(Class subscriberClass) {
this.methodInfos = methodInfos;
}
protected SubscriberMethod createSubscriberMethod(String methodName, Class<?> eventType,
ThreadMode threadMode, int priority, boolean sticky) {
Method method = subscriberClass.getDeclaredMethod(methodName, eventType);
return new SubscriberMethod(method, eventType, threadMode, priority, sticky);
}
@Override
public synchronized SubscriberMethod[] getSubscriberMethods() {
int length = methodInfos.length;
SubscriberMethod[] methods = new SubscriberMethod[length];
for (int i = 0; i < length; i++) {
SubscriberMethodInfo info = methodInfos[i];
methods[i] = createSubscriberMethod(info.methodName, info.eventType, info.threadMode,
info.priority, info.sticky);
}
return methods;
}
}
订阅索引只能提供基本信息,如方法名(methodName)和方法参数类型(eventType), Method 对象则需要在运行时反射获取 。
订阅索引虽然能带来订阅速度的提升,但也不是一本万利。 除了“添加注解处理器”和“分模块添加索引”比较麻烦之外,还有其他代价:
- 增加编译时间
- 增加冷启动时间
关于第2点的补充说明: 通过索引获取方法列表的方式,需在启动时添加索引,此过程中需要加载所有订阅者的类型; 而反射获取方法列表的方式,则不需要提前加载。
3.4 线程模式
线程模式这一块,有两个方面值得关注:
- 各种线程模式的含义和区别。
- 为了实现这些线程模式,EventBus做了些什么处理。
3.4.1 含义&区别
ThreadMode最初只有3种模式,几经迭代,如今有5种模式:
public enum ThreadMode {
POSTING,
MAIN,
MAIN_ORDERED,
BACKGROUND,
ASYNC
}
前面“订阅/发布”的分析中,可以看到,
调用post
方法后,经过一系列的运算之后,最终会调用到 postToSubscription
方法:
private void postToSubscription(Subscription subscription,
Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// 这个分支在 Android 平台可以忽略,
// 因为Android平台 mainThreadPoster != null
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
}
}
一共有4种订阅方法的执行方式:
- invokeSubscriber: 直接执行订阅方法
- mainThreadPoster: 将方法的执行 post 到主线程队列
- backgroundPoster: 将方法的执行 post 到串行执行的后台线程队列
- asyncPoster: 将方法的执行 post 到并发执行的后台线程队列
订阅方法的执行方式,取决于 订阅方法设定的线程模式 和 发布事件的线程。
汇总如下:
真一个让人眼花缭乱!
在我看来,POSTING、MAIN_ORDERED、ASYNC 三种模式已足够应付大多数场景。 而且这三种模式有明确执行方式:invokeSubscriber、mainThreadPoster、asyncPoster。 作为对比,MAIN 和 BACKGROUND 两种模式的执行方式飘忽不定(取决于发布事件的线程),增加了不确定性。
不过话说回来, MAIN 和 BACKGROUND 是 EventBus 最初支持的线程模式,为了向前兼容,这两种模式不能舍弃。
// EventBus项目,文件 ThreadMode.java 的第一条Git提交记录
public enum ThreadMode {
/** Subscriber will be called in the same thread, which is posting the event. */
PostThread,
/** Subscriber will be called in Android's main thread (sometimes referred to as UI thread). */
MainThread,
/** Subscriber will be called in a background thread */
BackgroundThread
}
各种模式中,MAIN_ORDERED 模式出现的最晚。
关于 MAIN 和 MAIN_ORDERED 的区别,相信大家应该都很清楚。
相比于直接执行,post到队列后执行会有defer(推迟,延后)的效果。
比如视图执行onCreate
时,View树还没布局完成,一些操作要等布局完成后才进行,这种情况下就可以用post
推迟执行。
需要在主线程执行的订阅方法,有下面如下情形:
- 大部分不关注方法是在 “当前线程立即执行” 还是 “post到主线程队列后执行”;
- 不管发送事件的线程是否主线程,订阅方法都需要 “post到主线程队列后执行”;
对应处理:
- 在 “不确定发送的线程是否主线程” 的情况下,用 MAIN_ORDERED 模式不会出错。
- 如果明确地知道 “发送事件的线程是主线程” ,并且需要 “当前线程立即执行” ,可以用 POSTING 模式。
虽说 POSTING、MAIN_ORDERED、ASYNC 三种模式就基本够用,但不是说 MAIN 和 BACKGROUND 不能用。 就好比,正交坐标系(原点,x,y,z)足够描述空间,但非正交坐标系也能够描述空间,有时候还更好用。 比如,“不在意发送线程是主线程还是子线程,只需要订阅方法在主线程执行”,则用 MAIN 模式就很合适。
3.4.2 线程处理
线程处理又可以分为两部分:
- 实现 poster(mainThreadPoster、backgroundPoster、asyncPoster)
- 并发处理 & 线程安全
Poster的实现,简单概括就是 :
- mainThreadPoster:Handler + Looper.getMainLooper()
- backgroundPoster/asyncPoster: 线程池 + 队列
“Handler、线程池、 队列”,这些大家都太熟悉了,同时考虑文章篇幅,就不展开细说了。
这里主要分析EventBus在并发处理和线程安全方面的处理。
回顾一下EventBus订阅和发布的实现:
public class EventBus {
// 事件 -> 订阅方法列表
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 订阅者 -> 关注的事件
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 注册订阅
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 将订阅者和事件写入 subscriptionsByEventType & typesBySubscriber
}
}
// 取消订阅
public synchronized void unregister(Object subscriber) {
// 从 subscriptionsByEventType & typesBySubscriber 移除订阅者和事件
}
public void post(Object event) {
postSingleEvent(event, postingState);
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 省略部分代码 ......
postSingleEventForEventType(event, postingState, eventClass);
}
private boolean postSingleEventForEventType(Object event,
PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postToSubscription(subscription, event, postingState.isMainThread);
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
// 其他模式 ...
}
}
}
我们这次的关注点不再是如何组织订阅者、事件和方法,而是各种线程模式下的并发和线程安全。
发布/订阅用到两个容器 subscriptionsByEventType
和 typesBySubscriber
,
这两个容器并非线程安全容器,而且可能被多个线程访问,故而需要加锁以确保读写不出现异常。
register
方法中,在查找到订阅方法之后,剩余的代码都加锁 (12行) ;unregister
方法,给整个方法加锁 (18行) ;post
方法,只给subscriptionsByEventType
加锁 (34行) 。
这里有一些关注点:
- 为什么
subscriptionsByEventType
和typesBySubscriber
不用线程安全容器? - 为什么
subscriptionsByEventType
和typesBySubscriber
不分别加锁? - 为什么post方法只给
subscriptionsByEventType``.get(eventClass)
加锁(为什么不给整个方法加锁)?
先看第1个问题。
假设 subscriptionsByEventType
的容器换成 ConcurrentHashMap
,然后注释掉 synchronized
, 是否能实现“线程安全”?
答案是:不能。
按照如上假设,变更代码如下:
private final ConcurrentHashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
}
subscriptions.add(newSubscription);
}
假如有不同的订阅者,同时在两个线程初次订阅同一个事件,同时执行如上代码的6行,都返回了null,
则两个线程会分别创建两个list, 但最终只有一个list可以添到subscriptionsByEventType
(预期是两个list都能添加)。
上面这个case, 并不能实现“线程安全”(逻辑不正确)。
有经验的朋友估计已经看出来了,这个例子中,对subscriptionsByEventType
有“读”和“写”两个操作,
其中 “写” 依赖 “读”,需同时对两个操作加锁,方能确保正确性。
第2个问题,和第1个问题有类似的考虑:
subscriptionsByEventType
是记录“事件->订阅方法列表”的容器,
typesBySubscriber
是记录“订阅者->事件列表”表的容器;
unregister
方法中,subscriptionsByEventType
依赖typesBySubscriber
,所以需要同时加锁。
第3个问题,和锁粒度相关:
register
和unregister
方法中没有很耗时的操作,但post
中可能有。
当订阅方法的执行方式是 invokeSubscriber
时,订阅方法在执行post的线程上执行。
因此,post
方法中,只给 subscriptionsByEventType``.get(eventClass)
加锁,加锁范围不覆盖invokeSubscriber
。
于是,invokeSubscriber
不会占用对象锁,也就不会阻塞register
和unregister
方法来了。
同时我们也看到,从subscriptionsByEventType
取出的,是一个记录了事件所对应的方法列表的CopyOnWriteArrayList
容器。
register
和unregister
方法会更新这个列表(写入和删除);而post
方法在获到这个列表之后,需要遍历此列表。
因此,用CopyOnWriteArrayList
,可避免遍历的过程中写入而引发ConcurrentModificationException
。
3.4.3 有序性
EventBus的post
方法中,还有这么一段处理:
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
}
public class EventBus {
private final ThreadLocal<PostingThreadState> currentPostingThreadState =
new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isPosting = true;
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
}
}
}
}
EventBus通过ThreadLocal
为每个执行post
的线程创建了一个eventQueue,将在不同线程发送的事件,分组到各自线程的队列中。
当一个线程执行post方法:
- 如果当前线程没有在发送事件(
isPosting=false
),则将事件放入队列,然后启动循环(从队列中获取事件来发送); - 如果当前线程正在发送事件(
isPosting=true
),事件会被放入eventQueue
中,然后结束post
方法。
什么情况下,会出现“调用post方法,而当前线程正在发送事件”?
由于不同线程的 PostingThreadStat
是隔离的,那么只有一种可能 :
调用post方法之前,当前线程将isPosting
设置为true了 ;
当前的post
调用,发生在上一个Event的订阅方法中,且订阅方法是被直接执行的。
POSTING、MAIN 、BACKGROUND等模式,订阅方法都可能会被直接执行。
比方说下面代码:
class Subscriber {
@Subscribe(threadMode = ThreadMode.POSTING)
fun function1(x: String) {
EventBus.getDefault().post(Integer.valueOf(100))
Log.d("Tag", "x:$x")
}
@Subscribe(threadMode = ThreadMode.POSTING)
fun function2(y: Integer){
Log.d("Tag", "y:$y")
}
}
EventBus.getDefault().register(Subscriber())
EventBus.getDefault().post("hello")
假如没有队列(eventQueue
),会走一个“深度递归,先进后出”的执行顺序(16行 -> 4行 -> 10行 -> 5行),先输出y再输出x;
在有队列的情况下,会等function1
执行结束再触发function2
(先进先出),先输出x再输出y。
简而言之,ThreadLocal(eventQueue,isPosting)
的这套处理,可使得相同线程的事件能被有序地处理(先post的事件,先完成处理)。
不过,现实中在 "订阅方法被直接执行且订阅方法中调用post” 的情况其实很少见,
也就是,大多数情况下,eventQueue
的事件个数不会大于1,所以eventQueue
的发挥空间很有限,只是兜底的作用。
更多地发挥作用,使处理“有序”的,是mainThreadPoster
和backgroundPoster
的队列。
注意区别:
eventQueue
中存放的是 “事件” ;
mainThreadPoster
和 backgroundPoster
的队列中存放的是 “订阅方法” 。
实际开发中,大家其实较少关注事件处理的“有序性”,只要事件发生时能被处理到就行。 因此,可在有执行顺序的需求时,再来回顾一下线程模式相关机制。 另外,除了注意EventBus的线程模式外,在订阅者内部管理执行顺序也很重要。
3.5 粘性事件
所谓粘性事件,是指当事件发布后,即使在事件发布后才注册的观察者,也能够收到上一个已发布的事件。 粘性事件可以在某些情况下很有用,但也可能导致意外的行为,因此需要谨慎使用。
收到粘性事件的两个必要条件:
- 订阅方法的注解中,声明关注粘性事件:
@Subscribe`` (sticky = ``true``)
。 - 曾经用
postSticky
来发送过对应事件。
相关代码如下:
public class EventBus {
private final Map<Class<?>, Object> stickyEvents;
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
// 省略订阅事件的代码......
// 检查是否发送粘性事件
if (subscriberMethod.sticky) {
// 是否支持事件继承(默认为true)
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
// 如果订阅方法注册的事件可以被粘性事件赋值(类型相同或者粘性事件是子类),则发送事件
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
// 仅获取相同类型的粘性事件进行发送
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
private void checkPostStickyEventToSubscription(
Subscription newSubscription,
Object stickyEvent
) {
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
}
粘性事件的源码还是比较简单的,只有两个操作涉及粘性事件:
postSticky
:记录事件到stickyEvents
容器,然后正常发送;register
:在执行完订阅事件后,检查stickyEvents
容器中是否包含订阅方法所支持的事件,是则发送。
3.6 优先级
EventBus得优先级的实现很简单,注册时处理一下插入顺序,发送时按列表顺序遍历即可。
public class EventBus {
// 事件 -> 订阅方法列表
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
}
}
其核心处理,概括而言,就是找到方法列表后,从头开始比较,找到优先级小于当前订阅方法的位置,插入该方法的后面。 最终的效果是,一个事件对应的多个订阅方法:
- 若优先级不同,数值大者先被处理;
- 若优先级相同,先订阅者先被处理。
四、其他事件总线
Andriod平台大家比较熟悉“事件总线”工具,除了EventBus外,还有LiveEventBus(基于LiveData), RxBus(基于RxJava)等。 如果不限于“总线”的范畴,协程库的 SharedFlow/StataFlow 也是常见的事件通信工具。 基于内容聚焦的考虑,就不分析Flow了,简单分析一下其他的事件总线工具。
4.1 LiveDataBus
LiveEventBus是基于LiveData实现的事件总线工具。
LiveData用于事件通信,有一点需要留意:LiveData的事件通知,天然是sticky(粘性)的。 关于这一点,LiveEventBus关联的文章《Android消息总线的演进之路》也有提到:
“每注册一个新的订阅者,这个订阅者立刻会收到一个回调,即使这个设置的动作发生在订阅之前。”
“橘生淮南则为橘,生于淮北则为枳”。
LiveData 是 JetPack 组件之一,适用于Android的数据层构建,有数据变更通知,具备生命周期感知。 其注册时回调的特性,使得Activity/Fragment等因屏幕旋转而重建生命周期时可以恢复页面数据。 但如果用作事件通信的基础工具,就有一点“水土不服”了:很多情况下,只需要实时通知,不需要“粘性”事件。 如果有使用“消息总线”一类的工具,可以留意一下有多少场景是需要粘性事件的(应该不多)。
为了可以在淮北种出“橘”,大体有两种思路:
- 盖温室:通过继承/组合,新增Wrapper,增加一些处理(SingeLiveData,UnPeek-LiveData)。
- 改基因:通过反射,注册订阅时hook原生组件的部分状态 (LiveEventBus提到的方案)。
LiveEventBus 的核心代码:LiveEventBusCore
精简如下:
// LiveEventBus.java
public final class LiveEventBus {
public static <T> Observable<T> get(@NonNull String key, @NonNull Class<T> type) {
return LiveEventBusCore.get().with(key, type);
}
public static <T> Observable<T> get(@NonNull String key) {
return (Observable<T>)get(key, Object.class);
}
public static <T extends LiveEvent> Observable<T> get(@NonNull Class<T> eventType){
return get(eventType.getName(), eventType);
}
}
// Observable.java
public interface Observable<T> {
void post(T value);
void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer);
void removeObserver(@NonNull Observer<T> observer);
// 其他方法
}
// LiveEventBusCore.java
public final class LiveEventBusCore {
private final Map<String, LiveEvent<Object>> bus;
// 内部类
private class LiveEvent<T> implements Observable<T> {
private final String key;
private final LifecycleLiveData<T> liveData;
LiveEvent(@NonNull String key) {
this.key = key;
this.liveData = new LifecycleLiveData<>(key);
}
public void post(T value) {
liveData.setValue(value);
}
public void observe(@NonNull final LifecycleOwner owner, @NonNull final Observer<T> observer) {
liveData.observe(owner, new ObserverWrapper<>(observer));
}
private void removeObserver(@NonNull Observer<T> observer) {
liveData.removeObserver(observer);
}
}
public synchronized <T> Observable<T> with(String key, Class<T> type) {
if (!bus.containsKey(key)) {
bus.put(key, new LiveEvent<>(key));
}
return (Observable<T>) bus.get(key);
}
}
LiveData的实现是经典的观察者模式,支持一对多的关系,但要构成“总线”,还需再加一层Map。
private final Map<String, LiveEvent<Object>> bus;
但在key的抉择上,看起来作者是既想要String类型,又想要Class,但最终顾此失彼:
LiveEventBusCore
的with
方法,同时传入 key
和 type
, 但只用到key
而没有用到type
。
另外,源码中好像没有看到其文章说的hook操作(反射修改mVersion
)?
hook操作在另外一个目录的代码中:LiveEventBus.java
在 MavenCentral 上发布的 LiveEventBus 版本中,是没有hook操作的。
4.2 RxBus
RxBus 有多种实现,有复杂的有简单的,都依赖于RxJava。
RxJava适合做事件总线的大约是 PublishSubject
和 FlowableProcessor
。
前者可以获取Observable
,后者可获取Flowable
,Flowable
是更新一点的API,比Observable
多了背压处理。
RxBus 实现举例:
- 以
FlowableProcessor
构建的RxBus:github.com/Blankj/RxBu… - 以
PublishSubject
构建的RxBus: github.com/YoKeyword/R…
以 PublishSubject
为例,其中一种“发布/订阅”的实现如:
// 创建总线
Subject<Object> rxBus = PublishSubject.create().toSerialized();
// 订阅
Disposable disposable = rxBus
.ofType(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
System.out.println("subscribe accept: " + s);
}
});
// 发布
rxBus.onNext("hello");
rxBus.onNext(100);
// 取消订阅
// 如果有多个 disposable,可以添加到 CompositeDisposable,统一取消订阅
disposable.dispose();
为方便分析,以上代码没有封装方法。
输出如下:
subscribe accept: hello
可以看出,观察者只处理了String类型的消息,而没有处理整数类型的消息。
我们来分析一下如上代码。
PublishSubject.create().toSerialized()
的实现如下:
// Subject
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<>(this);
}
}
// PublishSubject
public final class PublishSubject<T> extends Subject<T> {
public static <T> PublishSubject<T> create() {
return new PublishSubject<>();
}
}
最终返回一个套了SerializedSubject
的PublishSubject
,典型的装饰者模式。
类似 new BufferedOutputStream(new FileOutputStream("fileName"))
, 在原有功能的基础上再叠加处理。
这里套SerializedSubject
的目的,是增加“串行”发布的特性,避免onNext
被多线程同时调用时出现线程安全问题。
PublishSubject
既是 Observable 又是 Observer (既可以被订阅,又可以发布) 。
ofType
和subscribe
函数
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
public final <@NonNull U> Observable<U> ofType(@NonNull Class<U> clazz) {
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}
public final Observable<T> filter(@NonNull Predicate<? super T> predicate) {
return RxJavaPlugins.onAssembly(new ObservableFilter<>(this, predicate));
}
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete) {
LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
public final void subscribe(@NonNull Observer<? super T> observer) {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
}
}
- 调用
ofType
函数生成了执行isInstanceOf
方法的FilterObserver
,其作用是过滤指定类型的数据。 这个例子中,订阅了String
类型,所以Integer
类型的事件没有被回调。 isInstanceOf
方法底层用的是Class
的isInstance
方法,类似于EventBus的事件继承(订阅父类事件可接收子类事件)。- 订阅时调用
subscribe
, 会交替执行Observable
链路上的subscribe
和subscribeActual
方法,一路回溯到“源头”(这个例子中是PublishSubject
)。
而 PublishSubject
的 subscribeActual
方法,则是将 Observer
添加到队列。
public final class PublishSubject<T> extends Subject<T> {
final AtomicReference<PublishDisposable<T>[]> subscribers;
@Override
protected void subscribeActual(Observer<? super T> t) {
PublishDisposable<T> ps = new PublishDisposable<>(t, this);
t.onSubscribe(ps);
if (add(ps)) {
if (ps.isDisposed()) {
remove(ps);
}
} else {
// ..,,,,
}
}
boolean add(PublishDisposable<T> ps) {
for (;;) {
PublishDisposable<T>[] a = subscribers.get();
int n = a.length;
PublishDisposable<T>[] b = new PublishDisposable[n + 1];
// CopyOnWrite
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
public void onNext(T t) {
for (PublishDisposable<T> pd : subscribers.get()) {
pd.onNext(t);
}
}
}
在 PublishSubject
的onNext
方法中,直接遍历 subscribers
,调用其元素的onNext
方法。
这时候可以看到,onNext
方法执行了差不多和注册时相反的调用链,直到调用链末尾的订阅者的回调方法(accept
)。
从上面的分析可以看到,RxBus 也是订阅/发布的模式。
当然,和EventBus,有一些区别。
比如,在结构上:EventBus是 Map<Event, List<Subscription>>
, 而RxBus则是List<Subscribers>
。
在发布事件时: EventBus 能够索引到关注事件的订阅者列表; RxBus 则需要遍历所有订阅者,由订阅者检查是否需要处理该事件。
4.3 LightEventBus
EventBus 的源码中,查找方法花费了相当多的代码,同时拖慢EventBus的性能。 虽然后来增了注解处理器来支持加速方法查找,但又会引入编译耗时和启动耗时等负面作用。 如果去掉方法查找,换用其他的定义订阅方法的方式,那实现就简单很多了。
基于此,我实现了一个简化版的 EventBus,项目链接:LigthEventBus
相比原版EventBus, 主要变更了订阅方法相关的API(包括方法定义和注册)。
原版的订阅方法类:
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
}
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
}
为了简化使用,在实现上可以做如下简化:
- 订阅方法不需要定义成某个类的方法,可以一个方法接口(lambda形式)替代。
- 弱化了订阅者的概念(去掉
subscriber
),注册时只需要传入方法列表,也不用考虑继承等复杂因素。
简化后,“订阅方法”定义如下:
// (event: T) -> Unit 翻译成Java后,是一个名为 Function1 的接口类型
typealias Action<T> = (event: T) -> Unit
class EventHandler<T>( // 对应 SubscriberMethod
val eventType: Class<*>,
val threadMode: ThreadMode,
val sticky: Boolean,
val priority: Int,
val action: Action<T> // 对应 Method
) {
companion object {
// 增加一个静态方法,方便构建实例 (Kotlin语法糖)
inline fun <reified T> create(
threadMode: ThreadMode = ThreadMode.POSTING,
sticky: Boolean = false,
priority: Int = 0,
noinline action: Action<T>
): EventHandler<T> {
return EventHandler(T::class.java, threadMode, sticky, priority, action)
}
}
}
因为不再使用 Method
的概念,故而直接用lambda形式的方法接口替代原来的“方法”,并命名为Action
。
相应地,方法的处理,定于为 EventHandler
(对应原版的SubscriberMethod
)。
完整的 LightEventBus 实现,可参考:EventBus.kt
用法如下:
// 事件定义
data class NormalEvent(val time: String)
// 订阅方法列表
private val handlers: List<EventHandler<*>> =
listOf(
// 订阅方法
EventHandler.create<NormalEvent>(threadMode = ThreadMode.MAIN) { event ->
Log.d( "TAG" , "event: ${event::class.simpleName} " )
}
)
// 订阅
EventBus.getDefault().register(handlers)
// 发送事件
EventBus.getDefault().post(NormalEvent(time))
// 取消订阅
EventBus.getDefault().unregister(handlers)
- 事件的定义和事件的发布,和原版完全相同。
- 订阅/取消订阅,和原版EventBus类似,不同之处仅在于,原版传入的订阅者(订阅方法所在的类),简化版传入的是方法列表。
4.4 性能对比
测试代码:Benchmark.kt 测试设备:Huawei P30 pro 测试方式:冷启动,记录首次结果(各阶段的耗时,时间单位:ms)
测试代码是由ksp生成,在 Benchmark.kt 中不方便查看各事件总线的调用,目录 example 中有单个事件的测试样例。 其中,订阅者都是直接继承Object的类,没有深层次的类继承,没有多余的方法(非订阅方法)。
分别进行两批测试,结果如下:
事件数量:100
方式 | 准备 | 注册 | 发送 | 取消注册 |
---|---|---|---|---|
Index-EventBus | 14.9 | 4.1 | 3.1 | 0.4 |
Reflection-EventBus | 0.8 | 8.7 | 1.6 | 0.4 |
LiveEventBus | 0.6 | 7.1 | 1.1 | 1.7 |
RxBus | 17.4 | 3.5 | 4.4 | 0.3 |
LightEventBus | 0.6 | 0.4 | 1.0 | 0.2 |
事件数量:200
方式 | 准备 | 注册 | 发送 | 取消注册 |
---|---|---|---|---|
Index-EventBus | 21.4 | 7.0 | 5.4 | 0.8 |
Reflection-EventBus | 1.2 | 10.4 | 3.4 | 0.6 |
LiveEventBus | 0.8 | 7.2 | 1.9 | 2.0 |
RxBus | 18.1 | 6.0 | 9.1 | 0.8 |
LightEventBus | 1.3 | 0.9 | 1.9 | 0.5 |
备注:
Index-EventBus是EventBus使用“订阅索引”下的测试结果; Reflection-EventBus是EventBus使用反射查找方法下的测试结果。
测试结果解析(推测,不一定完全符合):
- EventBus使用“订阅索引”,注册时比用反射快一些,但是准备阶段(执行addIndex)则相对耗时;
- 以上结果,EventBus的反射方式的发送比“订阅索引”的要快,大概是因为JIT的缘故,因为两者的反射是同一套代码; 如果只执行Reflection-EventBus而不执行Index-EventBus,则发送时间是差不多的。 这里我偷了一下懒,直接放在一起执行了。
- LiveEventBus的发送事件相对EventBus较快, 因为EventBus默认情况下使用“事件继承”,发送事件时会递归查找事件的父类和接口,然后遍历所有支持的类型,去检索订阅方法。
- RxBus准备阶段相对较慢,因为RxBus基于RxJava,准备阶段要加载大量的类,故而相对耗时. RxBus的发送相对较慢,一是要先线性遍历,二是遍历过程中要做类型判断。
- LightEventBus的发送默认不使用事件继承,所以发送速度也比EventBus快。 LightEventBus的注册阶段不需要查找方法,所以比EventBus要快很多。
五、总结
本文主要分析了EventBus的用法、设计、实现等各方面的内容。 同时简单分析和对比了其他事件总线的用法、实现、性能等内容。 希望这些整理对大家有所帮助。 文中如有表述不对之处,欢迎交流讨论。
转载自:https://juejin.cn/post/7379831020495749157