likes
comments
collection
share

EventBus解析

作者站长头像
站长
· 阅读数 61

一、概述

事件通信(消息通信)是APP开发过程中常见的场景。 各级视图之间,视图和模型之间,再到模块与模块之间,都需要消息通信。

EventBus解析

若以现实中的通信类比,距离不同,可选的通信策略也不同:

  • 面对面:普通讲话;
  • 距离稍远:大声呼喊;
  • 几公里内:对讲机;
  • 更远距离:电信网络,卫星电话,互联网等。

在软件开发中,组件的耦合程度,可类比于现实通信的距离。 简而言之,就是要找到媒介,方能建立通信。 如果组件之间有直接的引用关系,通过方法调用,变量传递,接口回调等方式即可; 否则,则需要通过系统组件或者“全局变量”来通信,其中“全局变量”又有作用域之分。

除了通信距离之外,还有其他方面。

  • 通信模式:一对一、一对多、多对一、多对多。
  • 通信时效性:
    • 直接对话:对话时实时传递,对方会立即响应;
    • 发消息/邮件:消息会保留记录,对方可以不需要立即处理。

若只考虑APP内部的通信,能最大程度兼容通信距离、通信模式和通信模式通信时效等各个维度的,当属事件总线(EventBus)。

EventBus也有多种实现,本篇重点分析一下 greenrobotEventBus ,同时也简单分析一下其他事件总线。

二、EventBus简介

EventBus是适用于Android和Java的发布/订阅事件总线。

EventBus is a publish/subscribe event bus for Android and Java.

EventBus解析

EventBus 的 Github 主页比较简单,主要包含EventBus描述,基本用法,引入方式等。 官网中有更详细的介绍,包含各种用法和注意要点。

这里我们简单回顾一下EventBus的用法。

2.1 事件定义

EventBus 的事件定义是通过定义 class 来实现的,用class类型来标识事件,用类字段来装载数据。

public static class MessageEvent { /* Additional fields if needed */ }

这样定义,好处如下:

  • 发送事件和数据合二为一,事件只需传一个对象;
  • 用类字段装载数据,相比于传Map, 可变参数,Object等,数据定义更加清晰。

不过,这样定义事件的前提,是发布者和订阅者都能引用到事件类。 通常情况下,都能找到耦合点存放事件类; 如果找不到,可以用String, byte[]这样的万能类型作为事件类型。

2.2 声明订阅方法

只有声明了订阅方法的类,才能成为“订阅者”:没有订阅方法的类,注册时会抛异常。

声明订阅方法的规则如下:

  1. 方法需是 public,非static, 非abstract, 参数只有一个;
  2. 添加 @Subscribe注解。
@Subscribe(threadMode = ThreadMode.MAIN)  
public void onMessageEvent(MessageEvent event) {
    // Do something
}

@Subscribe注解有3层作用,

  1. 如果不使用注解处理器,则在运行时反射检索被@Subscribe注解的方法。
  2. 如果引入注解处理器,编译时会检索被@Subscribe注解且符合条件的方法,生成“订阅索引”。
  3. 通过注解参数,可以设定订阅方法的处理行为。

注解可选参数有:

参数类型含义
threadModeThreadMode线程模式:指定通过何种执行方式调用订阅方法。
stickyboolean粘性事件:当为true时,在注册方法可接收粘性事件(如果存在的话)。
priorityint优先级:事件可能对应多个订阅者,发布时按优先级和订阅时序的顺序发布。

这里我们先不展开,在后面实现分析部分再具体细说。

2.3 注册/反注册

EventBus.getDefault().register(subscriber);

EventBus.getDefault().unregister(subscriber);

注册和反注册通常和订阅者的生命周期相关: 可以在onCreate/onDestroy中定义, 也可以在onStart/onStop中定义, 或者如果订阅者生命周期和Application等同,也可以不用反注册。

2.4 发送事件

发送事件很简单,创建一个事件对象,传给post方法即可。

 EventBus.getDefault().post(new MessageEvent());

2.5 订阅索引

订阅索引是 EventBus 3 引入的特性,其目的是加速注册订阅(register)。 关于订阅索引的使用,可参考EventBus官网:subscriber-index

简单来说,创建订阅索引,需要执行如下步骤:

  1. 引入注解处理器

    apply plugin: 'kotlin-kapt' 
    
    dependencies {
       def eventbus_version = '3.2.0'
       implementation "org.greenrobot:eventbus:$eventbus_version"
       kapt "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
    }
    
    kapt {
       arguments {
           // 传入索引类的全路径名,当前模块添加了 @Subscribe 注解的方法会集成到此类中。
           arg('eventBusIndex', 'com.example.myapp.MyEventBusIndex')
       }
    }
    

    添加注解处理器后,编译时会检索被@Subscribe注解且符合条件的方法,生成“订阅索引”(一个包含了订阅者和订阅方法信息的类)。

  2. 在APP启动时,将注解处理器生成的索引类添加到 EventBus 中。

    其中,第2步又分两种情况:

    • 自行构建 EventBus 实例:
    EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
    
    • 注入默认EventBus实例
    EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
    // Now the default instance uses the given index. Use it like this:
    EventBus eventBus = EventBus.getDefault();
    
需要指出的是,注解处理器只能在编译时检索当前的编译单元(通常是module),而项目中通常会将不同的业务和功能划分到不同的module。 对应地,如果需要给全部方法添加索引,则每个模块都需要添加注解处理器,然后将各模块生成的索引注入到EventBus。
EventBus eventBus = EventBus.builder()
    .addIndex(new MyEventBusAppIndex())
    .addIndex(new MyEventBusLibIndex())
    .build();

添加索引不是必需的,如果不加索引,EventBus会通过反射检索订阅者的方法。

三、EventBus实现分析

3.1 基本要素

事件的订阅和发布,总体上有以下要素:

  • 角色:发布者 (Publisher)、订阅者(Subscriber)
  • 行为:订阅(register)、取消订阅(unregister)、 发布(post)

发布事件时,发布者的概念被弱化,事件(Event) 的概念被强化。

例如,登录账号后,发布“登录”事件:

 EventBus.getDefault().post(new LoginEvent());

具体开发中,我们可以认为调用了post方法的主体是发布者(可以是类,或者模块)。 但在上面这行代码中,我们看到了工具(EventBus)、动作(post)和事件(LoginEvent),而没有看到发布者。

EventBus的源码中,用名为 Subscription 的类来描述一组订阅。

Subscription 中包含了订阅者(subscriber) ,订阅者所关注的事件(event) ,事件对应的方法(Method) ,以及其他参数。

final class Subscription {
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
}    
    
public class SubscriberMethod {
    final Method method;
    final Class<?> eventType;
    final ThreadMode threadMode;
    final boolean sticky;
    final int priority
}    

EventBus的订阅者和事件是多对多的关系:

  • 一个订阅者可能关注多个事件 ;
  • 一个事件可能被多个订阅者所关注。

EventBus允许(但不建议)在同一个类(订阅者)中注解两个同样参数(事件)的方法。

@Subscribe(threadMode = ThreadMode.MAIN)  
public void methodB(Event2 event) {
}

@Subscribe(threadMode = ThreadMode.MAIN)  
public void methodC(Event2 event) {
}

EventBus解析

EventBus 用两个 Map 来记录订阅关系(订阅者,事件,方法):

public class EventBus {
     // 事件 -> 订阅方法列表,用于发布事件时检索订阅方法并执行
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    
     // 订阅者 -> 关注的事件,用于取消订阅时索引订阅者所关注的事件
    private final Map<Object, List<Class<?>>> typesBySubscriber;
}

基于此,我们接下来简单分析一下EventBus订阅/发布的实现。

3.2 订阅/发布

3.2.1 基础架构

EventBus发布订阅的架构图如下。

关注点:

  • 三个方法: registerunregisterpost
  • 两个容器: subscriptionsByEventTypetypesBySubscriber

EventBus解析

精简后的源码如下:

public class EventBus {
    private final SubscriberMethodFinder subscriberMethodFinder;
    
    // 事件 -> 订阅方法列表
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    
    // 订阅者 -> 关注的事件
    private final Map<Object, List<Class<?>>> typesBySubscriber;
    
   // 注册订阅
    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        // 检索订阅者的订阅方法
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    
    // 记录订阅关系
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
     
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } 
        subscriptions.add(newSubscription);
    
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
    }
    
    // 取消订阅
    public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        }
    }
    
    // 移除订阅关系
    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                // 同一个事件可能对应订阅者的多个方法,所以这里匹配后没有立即break
                if (subscription.subscriber == subscriber) {
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
            // 这里可以优化(个人观点):
            // 当 subscriptions.isEmpty()时,可执行 subscriptionsByEventType.remove(eventType),
            // 以免 subscriptionsByEventType 堆积空List,降低检索效率
        }
    }
    
    // 发布事件
    public void post(Object event) {
        postSingleEvent(event, postingState);
    }

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        // eventInheritance(事件继承) 默认是true, 可通过builder设置
        if (eventInheritance) {
            // 遍历子类以及父类和接口
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            postSingleEventForEventType(event, postingState, eventClass);
        }
    }

    private boolean postSingleEventForEventType(Object event, 
                        PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        // 如果有关联当前event的方法,则发送事件
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postToSubscription(subscription, event, postingState.isMainThread);
            }
            return true;
        }
        return false;
    }
    
    // 执行发送
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        // 根据线程模型,使用不同的发布策略(这个后面再分析)
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING: 
                invokeSubscriber(subscription, event);
                break;
            // 其他模式 ...
        }
    }

    void invokeSubscriber(Subscription subscription, Object event) {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    }    
}

EventBus的主体功能包含三个函数:

  • 订阅 (regiester)

    • 检索记录订阅者的方法,取其中添加了@Subscribe注解的方法;
    • 取方法参数的类型得到eventType, 取注解参数得到threadMode, sticky等参数;
    • 将“订阅者,事件,方法,以及其他参数”记录到 subscriptionsByEventTypetypesBySubscriber两个Map中。
  • 取消订阅(unregister)

    • 索引订阅者关注的事件列表,遍历事件,移除subscriptionsByEventType中关于此事件的订阅方法;
    • typesBySubscriber中移除订阅者以及所关联的事件列表。
  • 发布(post)

    • subscriptionsByEventType中索引Event所关联的Subscription列表;
    • 遍历Subscription列表,执行其方法。
    • 默认情况下,启用事件继承(eventInheritance) 。

3.2.2 事件继承

所谓“事件继承”,是指 :

如果方法订阅的事件类型是父类(或者接口),发布的事件类型是子类(或者实现),则方法能够收到该事件

EventBus实现这个,是基于面向对象的继承和多态规则,即:允许将 子类类型的引用 赋值给 父类类型的引用

举个例子:

假如有这样一个订阅方法,其订阅的类型是CharSequence

@Subscribe
fun test(param: CharSequence){
}

由于String实现了CharSequence,所以当有String类型的事件发送时,此方法会收到事件。

其实现原理如下:

public class EventBus {
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        // eventInheritance(事件继承) 默认是true, 可通过builder设置
        if (eventInheritance) {
            // lookupAllEventTypes 返回类型本身,以及父类和接口
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            // 遍历类型列表
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            postSingleEventForEventType(event, postingState, eventClass);
        }
    }
    
    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
            if (eventTypes == null) {
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                // 循环查找,直到根类(Object)为止
                while (clazz != null) {
                    eventTypes.add(clazz);
                    // 如果类实现了接口,则添加接口
                    addInterfaces(eventTypes, clazz.getInterfaces());
                    clazz = clazz.getSuperclass();
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            return eventTypes;
        }
    }
    
    // 递归查找接口的父类
    // 即:如果接口有继承关系,则连同接口的父类一并添加
    static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
        for (Class<?> interfaceClass : interfaces) {
            if (!eventTypes.contains(interfaceClass)) {
                eventTypes.add(interfaceClass);
                addInterfaces(eventTypes, interfaceClass.getInterfaces());
            }
        }
    }
}

例如,发送一个String类型事件,可以得到如下事件类型列表:

EventBus解析

EventBus解析

这个例子中,只有类型继承的关系,没有接口的继承关系。

按照源码的实现,如果接口有父接口,则会一并添加到列表中。

eventInheritance 可在构建 EventBus 实例时指定,默认为true。

大家常用的默认实例(EventBus.getDefault)的eventInheritance 就是默认情况下就是true。

// EventBus.java
public class EventBus {
    private final boolean eventInheritance;
    
    EventBus(EventBusBuilder builder) {
        eventInheritance = builder.eventInheritance;
    }
}

// EventBusBuilder.java
public class EventBusBuilder {
    boolean eventInheritance = true;
    
    public EventBusBuilder eventInheritance(boolean eventInheritance) {
        this.eventInheritance = eventInheritance;
        return this;
    }
}

EventBus之所以默认设置为true, 大概是因为:如果不默认设置为true,估计很多人都不会注意到这个特性。 但大多数情况下,其实发送者是有明确的意图的,发送者只想发送确定的类型给对应订阅者。 例如,登录模块会定义类似LoginEvent之类的类型, 其发送事件时,只期望订阅了LoginEvent类型的订阅者接收,而不期望被关注 Object 类型的订阅者接收。

这里我有个想法: 将eventInheritance从全局变量改为post方法的参数,则“两难自解”。 同时,通过方法重载,不传eventInheritance参数的post方法,默认为false; 如果明确想要订阅父类类型的方法能接收到事件,则调用post(event, true)

// 备注:以下代码并非源码,仅为个人想法

public void post(Object event) {
    post(event, false);
}

public void post(Object event, boolean eventInheritance) {
    postSingleEvent(event, postingState, eventInheritance);
}

这个小节的最后,关于事件的定义,有以下建议:

  1. 发送的事件为简单的数据类型,不要有太多的继承和实现(效率层面);
  2. 尽量用自定义的类型,尽量不要用String之类等“万能类型”,以确保清晰的订阅关系(维护层面)。

关于第2点的说明:

使用自定事件类型,通过检索事件类型的引用(Find Usages), 可以看到谁是订阅者,谁是发送者; 而使用String类型,则因为String类型有太多的引用,而难以找到订阅关系。

3.2 查找方法

3.3.1 查找流程

上一节我们看到,在注册订阅时,需要查找订阅者的订阅方法:

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
}

查找方法的大概过程如下:

EventBus解析

查找订阅方法的源码如下。

可关注三个方法:

  • findSubscriberMethods:查找入口
  • findUsingInfo:优先用索引查找
  • findUsingReflection:直接反射查找
public class SubscriberMethodFinder {
    private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
    
    // 订阅信息索引
    private List<SubscriberInfoIndex> subscriberInfoIndexes;
    private final boolean ignoreGeneratedIndex;

    // subscriberInfoIndexes 由 EventBusBuilder 传入;
    // 如果没有设定,则 subscriberInfoIndexes 为空。
    SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean ignoreGeneratedIndex) {
        this.subscriberInfoIndexes = subscriberInfoIndexes;
        this.ignoreGeneratedIndex = ignoreGeneratedIndex;
    }

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
       
        // 查找方法有两条路径:用反射查找,用索引查找
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }

    // 用索引信息查找方法列表
    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        // 从子类到根父类,递归查找
        while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            // 如果有订阅方法信息(subscriberInfo),则通过 subscriberInfo 获取方法列表
            // 如果没有,则还是通过反射查找
            if (findState.subscriberInfo != null) {
                // getSubscriberMethods 方法后面会分析,这里可以先跳过
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        // 获取 findState 中搜集的方法,并回收findState(用了池化技术)
        return getMethodsAndRelease(findState);
    }  
    
    // 从索引中获取订阅信息
    private SubscriberInfo getSubscriberInfo(FindState findState) {
        if (subscriberInfoIndexes != null) {
            for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                if (info != null) {
                    return info;
                }
            }
        }
        return null;
    }

    // 用反射查找方法列表
    private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        // 从子类到根父类,递归查找
        while (findState.clazz != null) {
            findUsingReflectionInSingleClass(findState);
            if (!findState.skipSuperClasses) {
                findState.moveToSuperclass();
            }
        }
        return getMethodsAndRelease(findState);
    }

    private void findUsingReflectionInSingleClass(FindState findState) {
        // 通过反射获取方法列表,暂存到 findState 中 
        // 这部分先不分析,留到 3.2.3 小节 }
    } 
}

概括来讲,查找订阅方法的过程,有两条路径:

  1. 反射查找:通过 clazz.getDeclaredMethods() 获取类的方法列表,过滤其中符合条件的方法,并封装SubscriberMethod
  2. 订阅索引:通过 EventBusBuilder 传入的 subscriberInfoIndexes 的指引下构建 SubscriberMethod

3.2.2 订阅者继承

无论是走反射还是索引,都要遍历当前类型到根类型(Object),解析各层级的方法列表。

   while (findState.clazz != null) {
       // 查找当前类方法.....
       
       // 切换到父类
       findState.moveToSuperclass();
   }

之所以要解析订阅者的父类,是因为: 如果一个类的父类也声明了订阅方法,那注册这个订阅者时,也需要找到其父类的订阅方法。 例如:

open class SubscriberX {
    @Subscribe()
    fun functionX(event: A) {
        Log.d("Tag", "functionX event:${event::class.simpleName}")
    }
}

class SubscriberY : SubscriberX(){
    @Subscribe()
    fun functionY(event: B) {
        Log.d("Tag", "functionY event:${event::class.simpleName}")
    }
}

fun test() {
    val subscriberY = SubscriberY()
    EventBus.getDefault().register(subscriberY)
    EventBus.getDefault().post(A())
    EventBus.getDefault().post(B())
    EventBus.getDefault().unregister(subscriberY)
}

// 输出:
// functionX event:A
// functionY event:B

订阅者的继承事件的继承更简单一些: 事件的继承需要迭代查找父类,并且获取每一层类型所实现的接口,以及接口的父接口。 订阅者的继承只需要迭代查找父类, 因为接口中不能实现方法,不会存在订阅方法。

另外,为了加速查找,会过滤一些全限定名以java,javax,android,androidx 等起头的类型。

void moveToSuperclass() {
    if (skipSuperClasses) {
        clazz = null;
    } else {
        clazz = clazz.getSuperclass();
        String clazzName = clazz.getName();
        // Skip system classes, this degrades performance.
        // Also we might avoid some ClassNotFoundException (see FAQ for background).
        if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") ||
            clazzName.startsWith("android.") || clazzName.startsWith("androidx.")) {
                clazz = null;
            }
        }
}

3.2.3 反射查找

反射查找的源码如下:

public class SubscriberMethodFinder {
    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods,
            // especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // 获取当前类的所有方法,包括父类声明的方法
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        // 遍历方法列表,查找其中满足条件的订阅方法。
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            if ((modifiers & Modifier.PUBLIC) != 0 && 
                (modifiers & (Modifier.ABSTRACT | Modifier.STATIC)) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        // 取出注解携带的参数,连同 method 和 eventType, 封装到SubscriberMethod
                        findState.subscriberMethods.add(
                            new SubscriberMethod(
                                    method,
                                    eventType,
                                    subscribeAnnotation.threadMode(),
                                    subscribeAnnotation.priority(),
                                    subscribeAnnotation.sticky()
                            )
                        );
                    }
                }
            }
        }
    }
}

看起来代码不少,但逻辑比较简单。 就是获取类声明的方法,并挑选满足其中条件:

  1. public,非static, 非abstract, 参数只有一个;
  2. 声明了 @Subscribe注解。

但看起来逻辑简单的操作,有时候会有较大的消耗。 比如下面这段代码:

class MainActivity : AppCompatActivity() {
    @Subscribe
    fun test(param: CharSequence){
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        enableEdgeToEdge()
        setContentView(R.layout.activity_main)
        ViewCompat.setOnApplyWindowInsetsListener(findViewById(R.id.main)) { v, insets ->
        val systemBars = insets.getInsets(WindowInsetsCompat.Type.systemBars())
            v.setPadding(systemBars.left, systemBars.top, systemBars.right, systemBars.bottom)
            insets
        }

        findViewById<Button>(R.id.testBtn).setOnClickListener {
            EventBus.getDefault().post("hello")
        }

        EventBus.getDefault().register(this)
    }

    override fun onDestroy() {
        super.onDestroy()
        EventBus.getDefault().unregister(this)
    }
}

看起来只声明的三个方法,事实上其方法不止3三个:

EventBus解析

类中使用的一些的lambda方法,也会被 getDeclaredMethods 检索出来。

源码中关于 getDeclaredMethods 的注释是:

This is faster than getMethods, especially when subscribers are fat classes like Activities

重点在于 fat classes like Activities

这个例子中,MainActivity所继承的AppCompatActivity, 其全限定名为:androidx.appcompat.app.AppCompatActivity,正好是androidx包下的类,所以不需要继续迭代查找父类。 实际开发中,Activity/Fragment通常自定义了多层继承,每一层又可能有多个方法。

故此,不建议用 Activity/Fragment 作为订阅者,因为这种对象类继承层级较多,方法也多(EventBus源码中称之为"fat classes") , 无论是注解处理器的编译时分析,还是反射的运行时遍历,代价都很高。 可以通过创建只包含订阅方法的类来作为订阅者。

例如:

class MainActivity : AppCompatActivity() {
    inner class LoginEventHandler {
        @Subscribe(threadMode = ThreadMode.MAIN)
        fun onLogin(uid: Long) {
        }
        
        @Subscribe(threadMode = ThreadMode.MAIN)
        fun onLogout(uid: Long){
        }
    }

    private val loginEventHandler = LoginEventHandler()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        EventBus.getDefault().register(loginEventHandler)
    }

    override fun onDestroy() {
        super.onDestroy()
        EventBus.getDefault().unregister(loginEventHandler)
    }
}

如此,订阅者不会过深的继承层级,也没有庞大的方法列表,相比直接用Activity/Fragment作为订阅者,必然是要快许多的。

3.3.4 订阅索引

曾几何时,EventBus查找订阅方法是对比方法名来实现的。

方法名为onEventonEventXXX, XXX为ThreadMode

EventBus解析

后来订阅方法的定义换成了用注解声明,并通过注解传入threadMode,sticky,priority等参数。 再后来,引入了订阅索引(subscriber index) ,用以加速订阅方法的查找。

网上有种说法: “EventBus 3 可以在编译时获取信息,不需要在运行时反射”。

这种说法并不准确:

  1. 如果不用注解处理器&添加订阅索引,还是会通过反射查找方法;
  2. 即使添加了订阅索引,也只是不需要通过反射遍历订阅者的方法,创建订阅方法需要反射,发送事件时调用方法也需要反射。

例如,通过订阅索引创建订阅方法的代码如下:

public class SubscriberMethodInfo {
    final String methodName;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
}

public class SimpleSubscriberInfo implements SubscriberInfo {
    private final Class subscriberClass;
    
    // 注解处理器生成的索引类中,会创建 SimpleSubscriberInfo 对象
    public SimpleSubscriberInfo(Class subscriberClass) {
        this.methodInfos = methodInfos;
    }
    
    protected SubscriberMethod createSubscriberMethod(String methodName, Class<?> eventType, 
                                       ThreadMode threadMode, int priority, boolean sticky) {                        
        Method method = subscriberClass.getDeclaredMethod(methodName, eventType);
        return new SubscriberMethod(method, eventType, threadMode, priority, sticky);
    }
    
    @Override
    public synchronized SubscriberMethod[] getSubscriberMethods() {
        int length = methodInfos.length;
        SubscriberMethod[] methods = new SubscriberMethod[length];
        for (int i = 0; i < length; i++) {
            SubscriberMethodInfo info = methodInfos[i];
            methods[i] = createSubscriberMethod(info.methodName, info.eventType, info.threadMode,
                    info.priority, info.sticky);
        }
        return methods;
    }
}

订阅索引只能提供基本信息,如方法名(methodName)和方法参数类型(eventType), Method 对象则需要在运行时反射获取 。

订阅索引虽然能带来订阅速度的提升,但也不是一本万利。 除了“添加注解处理器”和“分模块添加索引”比较麻烦之外,还有其他代价:

  1. 增加编译时间
  2. 增加冷启动时间

关于第2点的补充说明: 通过索引获取方法列表的方式,需在启动时添加索引,此过程中需要加载所有订阅者的类型; 而反射获取方法列表的方式,则不需要提前加载。

3.4 线程模式

线程模式这一块,有两个方面值得关注:

  1. 各种线程模式的含义和区别。
  2. 为了实现这些线程模式,EventBus做了些什么处理。

3.4.1 含义&区别

ThreadMode最初只有3种模式,几经迭代,如今有5种模式:

public enum ThreadMode {
    POSTING,
    MAIN,
    MAIN_ORDERED,
    BACKGROUND,
    ASYNC
}

前面“订阅/发布”的分析中,可以看到, 调用post方法后,经过一系列的运算之后,最终会调用到 postToSubscription 方法:

private void postToSubscription(Subscription subscription, 
                                Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // 这个分支在 Android 平台可以忽略,
                // 因为Android平台 mainThreadPoster != null
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
    }
}

一共有4种订阅方法的执行方式

  1. invokeSubscriber: 直接执行订阅方法
  2. mainThreadPoster: 将方法的执行 post 到主线程队列
  3. backgroundPoster: 将方法的执行 post 到串行执行的后台线程队列
  4. asyncPoster: 将方法的执行 post 到并发执行的后台线程队列

订阅方法的执行方式,取决于 订阅方法设定的线程模式发布事件的线程

汇总如下:

EventBus解析

真一个让人眼花缭乱!

在我看来,POSTING、MAIN_ORDERED、ASYNC 三种模式已足够应付大多数场景。 而且这三种模式有明确执行方式:invokeSubscribermainThreadPosterasyncPoster。 作为对比,MAINBACKGROUND 两种模式的执行方式飘忽不定(取决于发布事件的线程),增加了不确定性。

不过话说回来, MAINBACKGROUND 是 EventBus 最初支持的线程模式,为了向前兼容,这两种模式不能舍弃。

// EventBus项目,文件 ThreadMode.java 的第一条Git提交记录
public enum ThreadMode {
    /** Subscriber will be called in the same thread, which is posting the event. */
    PostThread,
    /** Subscriber will be called in Android's main thread (sometimes referred to as UI thread). */
    MainThread,
    /** Subscriber will be called in a background thread */
    BackgroundThread
}

各种模式中,MAIN_ORDERED 模式出现的最晚。 关于 MAINMAIN_ORDERED 的区别,相信大家应该都很清楚。 相比于直接执行,post到队列后执行会有defer(推迟,延后)的效果。 比如视图执行onCreate时,View树还没布局完成,一些操作要等布局完成后才进行,这种情况下就可以用post推迟执行。

需要在主线程执行的订阅方法,有下面如下情形:

  1. 大部分不关注方法是在 “当前线程立即执行” 还是 “post到主线程队列后执行”;
  2. 不管发送事件的线程是否主线程,订阅方法都需要 “post到主线程队列后执行”;

对应处理:

  • “不确定发送的线程是否主线程” 的情况下,用 MAIN_ORDERED 模式不会出错。
  • 如果明确地知道 “发送事件的线程是主线程” ,并且需要 “当前线程立即执行” ,可以用 POSTING 模式。

虽说 POSTING、MAIN_ORDERED、ASYNC 三种模式就基本够用,但不是说 MAINBACKGROUND 不能用。 就好比,正交坐标系(原点,x,y,z)足够描述空间,但非正交坐标系也能够描述空间,有时候还更好用。 比如,“不在意发送线程是主线程还是子线程,只需要订阅方法在主线程执行”,则用 MAIN 模式就很合适。

3.4.2 线程处理

线程处理又可以分为两部分:

  1. 实现 poster(mainThreadPoster、backgroundPoster、asyncPoster)
  2. 并发处理 & 线程安全

Poster的实现,简单概括就是 :

  • mainThreadPoster:Handler + Looper.getMainLooper()
  • backgroundPoster/asyncPoster: 线程池 + 队列

“Handler、线程池、 队列”,这些大家都太熟悉了,同时考虑文章篇幅,就不展开细说了。

这里主要分析EventBus在并发处理线程安全方面的处理。

回顾一下EventBus订阅和发布的实现:

public class EventBus {
    // 事件 -> 订阅方法列表
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    
    // 订阅者 -> 关注的事件
    private final Map<Object, List<Class<?>>> typesBySubscriber;
    
   // 注册订阅
    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            // 将订阅者和事件写入 subscriptionsByEventType & typesBySubscriber 
        }
    }

    // 取消订阅
    public synchronized void unregister(Object subscriber) {
        // 从 subscriptionsByEventType & typesBySubscriber 移除订阅者和事件
    }
    
    public void post(Object event) {
        postSingleEvent(event, postingState);
    }
    
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
         // 省略部分代码 ...... 
         postSingleEventForEventType(event, postingState, eventClass);
    }

    private boolean postSingleEventForEventType(Object event, 
                        PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postToSubscription(subscription, event, postingState.isMainThread);
            }
            return true;
        }
        return false;
    }
    
   private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING: 
                invokeSubscriber(subscription, event);
                break;
            // 其他模式 ...
        }
    }
}

我们这次的关注点不再是如何组织订阅者、事件和方法,而是各种线程模式下的并发和线程安全。

发布/订阅用到两个容器 subscriptionsByEventTypetypesBySubscriber , 这两个容器并非线程安全容器,而且可能被多个线程访问,故而需要加锁以确保读写不出现异常。

  • register方法中,在查找到订阅方法之后,剩余的代码都加锁 (12行)
  • unregister方法,给整个方法加锁 (18行)
  • post方法,只给 subscriptionsByEventType 加锁 (34行)

这里有一些关注点:

  1. 为什么 subscriptionsByEventTypetypesBySubscriber 不用线程安全容器?
  2. 为什么 subscriptionsByEventTypetypesBySubscriber 不分别加锁?
  3. 为什么post方法只给 subscriptionsByEventType``.get(eventClass)加锁(为什么不给整个方法加锁)?

先看第1个问题。

假设 subscriptionsByEventType 的容器换成 ConcurrentHashMap,然后注释掉 synchronized, 是否能实现“线程安全”?

答案是:不能。

按照如上假设,变更代码如下:

private final ConcurrentHashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } 
    subscriptions.add(newSubscription);
}

假如有不同的订阅者,同时在两个线程初次订阅同一个事件,同时执行如上代码的6行,都返回了null, 则两个线程会分别创建两个list, 但最终只有一个list可以添到subscriptionsByEventType(预期是两个list都能添加)。

上面这个case, 并不能实现“线程安全”(逻辑不正确)。

有经验的朋友估计已经看出来了,这个例子中,对subscriptionsByEventType有“读”和“写”两个操作, 其中 “写” 依赖 “读”,需同时对两个操作加锁,方能确保正确性。

第2个问题,和第1个问题有类似的考虑: subscriptionsByEventType 是记录“事件->订阅方法列表”的容器, typesBySubscriber 是记录“订阅者->事件列表”表的容器; unregister方法中,subscriptionsByEventType依赖typesBySubscriber,所以需要同时加锁。

第3个问题,和锁粒度相关: registerunregister方法中没有很耗时的操作,但post中可能有。 当订阅方法的执行方式是 invokeSubscriber时,订阅方法在执行post的线程上执行。 因此,post方法中,只给 subscriptionsByEventType``.get(eventClass)加锁,加锁范围不覆盖invokeSubscriber。 于是,invokeSubscriber不会占用对象锁,也就不会阻塞registerunregister方法来了。

同时我们也看到,从subscriptionsByEventType取出的,是一个记录了事件所对应的方法列表CopyOnWriteArrayList容器。 registerunregister方法会更新这个列表(写入和删除);而post方法在获到这个列表之后,需要遍历此列表。 因此,用CopyOnWriteArrayList,可避免遍历的过程中写入而引发ConcurrentModificationException

3.4.3 有序性

EventBus的post方法中,还有这么一段处理:

final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<>();
    boolean isPosting;
}

public class EventBus {
    private final ThreadLocal<PostingThreadState> currentPostingThreadState =
        new ThreadLocal<PostingThreadState>() {
            @Override
            protected PostingThreadState initialValue() {
                return new PostingThreadState();
            }
        };
        
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
    
        if (!postingState.isPosting) {
            postingState.isPosting = true;
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
            }
        }
    }
 }

EventBus通过ThreadLocal为每个执行post的线程创建了一个eventQueue,将在不同线程发送的事件,分组到各自线程的队列中。

EventBus解析

当一个线程执行post方法:

  1. 如果当前线程没有在发送事件(isPosting=false),则将事件放入队列,然后启动循环(从队列中获取事件来发送);
  2. 如果当前线程正在发送事件isPosting=true),事件会被放入eventQueue中,然后结束post方法。

什么情况下,会出现“调用post方法,而当前线程正在发送事件”? 由于不同线程的 PostingThreadStat是隔离的,那么只有一种可能 : 调用post方法之前,当前线程将isPosting设置为true了 ; 当前的post调用,发生在上一个Event的订阅方法中,且订阅方法是被直接执行的。

EventBus解析

POSTINGMAINBACKGROUND等模式,订阅方法都可能会被直接执行

比方说下面代码:

class Subscriber {
    @Subscribe(threadMode = ThreadMode.POSTING)
    fun function1(x: String) {
        EventBus.getDefault().post(Integer.valueOf(100))
        Log.d("Tag", "x:$x")
    }
    
    @Subscribe(threadMode = ThreadMode.POSTING)
    fun function2(y: Integer){
        Log.d("Tag", "y:$y")
    }
}

EventBus.getDefault().register(Subscriber())
EventBus.getDefault().post("hello")

假如没有队列(eventQueue),会走一个“深度递归,先进后出”的执行顺序(16行 -> 4行 -> 10行 -> 5行),先输出y再输出x; 在有队列的情况下,会等function1执行结束再触发function2(先进先出),先输出x再输出y。

EventBus解析

简而言之,ThreadLocal(eventQueue,isPosting) 的这套处理,可使得相同线程的事件能被有序地处理(先post的事件,先完成处理)。 不过,现实中在 "订阅方法被直接执行且订阅方法中调用post” 的情况其实很少见, 也就是,大多数情况下,eventQueue的事件个数不会大于1,所以eventQueue 的发挥空间很有限,只是兜底的作用。

更多地发挥作用,使处理“有序”的,是mainThreadPosterbackgroundPoster的队列。

注意区别: eventQueue 中存放的是 “事件”mainThreadPosterbackgroundPoster 的队列中存放的是 “订阅方法”

EventBus解析

实际开发中,大家其实较少关注事件处理的“有序性”,只要事件发生时能被处理到就行。 因此,可在有执行顺序的需求时,再来回顾一下线程模式相关机制。 另外,除了注意EventBus的线程模式外,在订阅者内部管理执行顺序也很重要。

3.5 粘性事件

所谓粘性事件,是指当事件发布后,即使在事件发布后才注册的观察者,也能够收到上一个已发布的事件。 粘性事件可以在某些情况下很有用,但也可能导致意外的行为,因此需要谨慎使用。

收到粘性事件的两个必要条件:

  1. 订阅方法的注解中,声明关注粘性事件:@Subscribe`` (sticky = ``true``)
  2. 曾经用 postSticky 来发送过对应事件。

相关代码如下:

public class EventBus {
    private final Map<Class<?>, Object> stickyEvents;

    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        
        // 省略订阅事件的代码......
        
        // 检查是否发送粘性事件
        if (subscriberMethod.sticky) {
            // 是否支持事件继承(默认为true)
            if (eventInheritance) {
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                 // 如果订阅方法注册的事件可以被粘性事件赋值(类型相同或者粘性事件是子类),则发送事件
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                // 仅获取相同类型的粘性事件进行发送
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }
    
    private void checkPostStickyEventToSubscription(
        Subscription newSubscription, 
        Object stickyEvent
    ) {
        if (stickyEvent != null) {
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }
    
    public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        post(event);
    }
}

粘性事件的源码还是比较简单的,只有两个操作涉及粘性事件:

  1. postSticky:记录事件到 stickyEvents 容器,然后正常发送;
  2. register:在执行完订阅事件后,检查stickyEvents 容器中是否包含订阅方法所支持的事件,是则发送。

3.6 优先级

EventBus得优先级的实现很简单,注册时处理一下插入顺序,发送时按列表顺序遍历即可。

public class EventBus {
    // 事件 -> 订阅方法列表
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    
    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        }
    
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
    }
}

其核心处理,概括而言,就是找到方法列表后,从头开始比较,找到优先级小于当前订阅方法的位置,插入该方法的后面。 最终的效果是,一个事件对应的多个订阅方法:

  1. 若优先级不同,数值大者先被处理;
  2. 若优先级相同,先订阅者先被处理。

四、其他事件总线

Andriod平台大家比较熟悉“事件总线”工具,除了EventBus外,还有LiveEventBus(基于LiveData), RxBus(基于RxJava)等。 如果不限于“总线”的范畴,协程库的 SharedFlow/StataFlow 也是常见的事件通信工具。 基于内容聚焦的考虑,就不分析Flow了,简单分析一下其他的事件总线工具。

4.1 LiveDataBus

LiveEventBus是基于LiveData实现的事件总线工具。

LiveData用于事件通信,有一点需要留意:LiveData的事件通知,天然是sticky(粘性)的。 关于这一点,LiveEventBus关联的文章《Android消息总线的演进之路》也有提到:

“每注册一个新的订阅者,这个订阅者立刻会收到一个回调,即使这个设置的动作发生在订阅之前。”

“橘生淮南则为橘,生于淮北则为枳”。

LiveData 是 JetPack 组件之一,适用于Android的数据层构建,有数据变更通知,具备生命周期感知。 其注册时回调的特性,使得Activity/Fragment等因屏幕旋转而重建生命周期时可以恢复页面数据。 但如果用作事件通信的基础工具,就有一点“水土不服”了:很多情况下,只需要实时通知,不需要“粘性”事件。 如果有使用“消息总线”一类的工具,可以留意一下有多少场景是需要粘性事件的(应该不多)。

为了可以在淮北种出“橘”,大体有两种思路:

  • 盖温室:通过继承/组合,新增Wrapper,增加一些处理(SingeLiveData,UnPeek-LiveData)。
  • 改基因:通过反射,注册订阅时hook原生组件的部分状态 (LiveEventBus提到的方案)。

LiveEventBus 的核心代码:LiveEventBusCore

精简如下:

// LiveEventBus.java
public final class LiveEventBus {
    public static <T> Observable<T> get(@NonNull String key, @NonNull Class<T> type) {
        return LiveEventBusCore.get().with(key, type);
    }

    public static <T> Observable<T> get(@NonNull String key) {
        return (Observable<T>)get(key, Object.class);
    }

    public static <T extends LiveEvent> Observable<T> get(@NonNull Class<T> eventType){
        return get(eventType.getName(), eventType);
    }
}

// Observable.java
public interface Observable<T> {
    void post(T value);
    void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer);
    void removeObserver(@NonNull Observer<T> observer);
    // 其他方法
}

// LiveEventBusCore.java
public final class LiveEventBusCore {
    private final Map<String, LiveEvent<Object>> bus;
    
    // 内部类
    private class LiveEvent<T> implements Observable<T> {
        private final String key;
        private final LifecycleLiveData<T> liveData;
        
        LiveEvent(@NonNull String key) {
            this.key = key;
            this.liveData = new LifecycleLiveData<>(key);
        }
    
        public void post(T value) {
             liveData.setValue(value);
        }
        
        public void observe(@NonNull final LifecycleOwner owner, @NonNull final Observer<T> observer) {
           liveData.observe(owner, new ObserverWrapper<>(observer));
        }

        private void removeObserver(@NonNull Observer<T> observer) {
            liveData.removeObserver(observer);
        }
    }
    
    public synchronized <T> Observable<T> with(String key, Class<T> type) { 
        if (!bus.containsKey(key)) {
            bus.put(key, new LiveEvent<>(key));
        }
        return (Observable<T>) bus.get(key);
    }
}

LiveData的实现是经典的观察者模式,支持一对多的关系,但要构成“总线”,还需再加一层Map。

private final Map<String, LiveEvent<Object>> bus;

EventBus解析

但在key的抉择上,看起来作者是既想要String类型,又想要Class,但最终顾此失彼: LiveEventBusCorewith方法,同时传入 keytype , 但只用到key而没有用到type

另外,源码中好像没有看到其文章说的hook操作(反射修改mVersion)? hook操作在另外一个目录的代码中:LiveEventBus.java 在 MavenCentral 上发布的 LiveEventBus 版本中,是没有hook操作的。

4.2 RxBus

RxBus 有多种实现,有复杂的有简单的,都依赖于RxJava。 RxJava适合做事件总线的大约是 PublishSubjectFlowableProcessor。 前者可以获取Observable,后者可获取FlowableFlowable是更新一点的API,比Observable多了背压处理。

RxBus 实现举例:

PublishSubject 为例,其中一种“发布/订阅”的实现如:

// 创建总线
Subject<Object> rxBus = PublishSubject.create().toSerialized();

// 订阅 
Disposable disposable = rxBus
        .ofType(String.class)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                System.out.println("subscribe accept: " + s);
            }
        });

// 发布
rxBus.onNext("hello");
rxBus.onNext(100);

// 取消订阅
// 如果有多个 disposable,可以添加到 CompositeDisposable,统一取消订阅
disposable.dispose();

为方便分析,以上代码没有封装方法。

输出如下:

subscribe accept: hello

可以看出,观察者只处理了String类型的消息,而没有处理整数类型的消息。

我们来分析一下如上代码。

  1. PublishSubject.create().toSerialized() 的实现如下:
// Subject
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<>(this);
    }
}

// PublishSubject
public final class PublishSubject<T> extends Subject<T> {
    public static <T> PublishSubject<T> create() {
        return new PublishSubject<>();
    }
}

最终返回一个套了SerializedSubjectPublishSubject,典型的装饰者模式。 类似 new BufferedOutputStream(new FileOutputStream("fileName")), 在原有功能的基础上再叠加处理。 这里套SerializedSubject的目的,是增加“串行”发布的特性,避免onNext被多线程同时调用时出现线程安全问题。

EventBus解析

PublishSubject 既是 Observable 又是 Observer (既可以被订阅,又可以发布) 。

  1. ofTypesubscribe 函数
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
    public final <@NonNull U> Observable<U> ofType(@NonNull Class<U> clazz) {
        return filter(Functions.isInstanceOf(clazz)).cast(clazz);
    }
    
    public final Observable<T> filter(@NonNull Predicate<? super T> predicate) {
        return RxJavaPlugins.onAssembly(new ObservableFilter<>(this, predicate));
    }
    
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
    }
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
            @NonNull Action onComplete) {
        LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
        subscribe(ls);
        return ls;
    }
    
    public final void subscribe(@NonNull Observer<? super T> observer) {
         observer = RxJavaPlugins.onSubscribe(this, observer);
         subscribeActual(observer);
    }
}
  • 调用 ofType 函数生成了执行 isInstanceOf 方法的FilterObserver,其作用是过滤指定类型的数据。 这个例子中,订阅了String类型,所以Integer类型的事件没有被回调。
  • isInstanceOf 方法底层用的是 ClassisInstance 方法,类似于EventBus的事件继承(订阅父类事件可接收子类事件)。
  • 订阅时调用 subscribe, 会交替执行Observable链路上的subscribesubscribeActual方法,一路回溯到“源头”(这个例子中是PublishSubject)。

EventBus解析

PublishSubjectsubscribeActual方法,则是将 Observer 添加到队列。

public final class PublishSubject<T> extends Subject<T> {
    final AtomicReference<PublishDisposable<T>[]> subscribers;
    
    @Override
    protected void subscribeActual(Observer<? super T> t) {
        PublishDisposable<T> ps = new PublishDisposable<>(t, this);
        t.onSubscribe(ps);
        if (add(ps)) {
            if (ps.isDisposed()) {
                remove(ps);
            }
        } else {
          // ..,,,,
        }
    }
    
    boolean add(PublishDisposable<T> ps) {
        for (;;) {
            PublishDisposable<T>[] a = subscribers.get();
            int n = a.length;
            PublishDisposable<T>[] b = new PublishDisposable[n + 1];
            // CopyOnWrite
            System.arraycopy(a, 0, b, 0, n);
            b[n] = ps;
            if (subscribers.compareAndSet(a, b)) {
                return true;
            }
        }
    }
    
    public void onNext(T t) {
        for (PublishDisposable<T> pd : subscribers.get()) {
            pd.onNext(t);
        }
    }
}

PublishSubjectonNext方法中,直接遍历 subscribers ,调用其元素的onNext方法。 这时候可以看到,onNext方法执行了差不多和注册时相反的调用链,直到调用链末尾的订阅者的回调方法(accept)。

EventBus解析

从上面的分析可以看到,RxBus 也是订阅/发布的模式。 当然,和EventBus,有一些区别。 比如,在结构上:EventBus是 Map<Event, List<Subscription>>, 而RxBus则是List<Subscribers>

EventBus解析

在发布事件时: EventBus 能够索引到关注事件的订阅者列表; RxBus 则需要遍历所有订阅者,由订阅者检查是否需要处理该事件。

4.3 LightEventBus

EventBus 的源码中,查找方法花费了相当多的代码,同时拖慢EventBus的性能。 虽然后来增了注解处理器来支持加速方法查找,但又会引入编译耗时和启动耗时等负面作用。 如果去掉方法查找,换用其他的定义订阅方法的方式,那实现就简单很多了。

基于此,我实现了一个简化版的 EventBus,项目链接:LigthEventBus

相比原版EventBus, 主要变更了订阅方法相关的API(包括方法定义和注册)。

原版的订阅方法类:

final class Subscription {
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
}    

public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
}  

为了简化使用,在实现上可以做如下简化:

  1. 订阅方法不需要定义成某个类的方法,可以一个方法接口(lambda形式)替代。
  2. 弱化了订阅者的概念(去掉subscriber),注册时只需要传入方法列表,也不用考虑继承等复杂因素。

简化后,“订阅方法”定义如下:

// (event: T) -> Unit 翻译成Java后,是一个名为 Function1 的接口类型
typealias Action<T> = (event: T) -> Unit

class EventHandler<T>( // 对应 SubscriberMethod
    val eventType: Class<*>,
    val threadMode: ThreadMode,
    val sticky: Boolean,
    val priority: Int,
    val action: Action<T>  // 对应 Method
) {
    companion object {
        // 增加一个静态方法,方便构建实例 (Kotlin语法糖)
        inline fun <reified T> create(
            threadMode: ThreadMode = ThreadMode.POSTING,
            sticky: Boolean = false,
            priority: Int = 0,
            noinline action: Action<T>
        ): EventHandler<T> {
            return EventHandler(T::class.java, threadMode, sticky, priority, action)
        }
    }
}

因为不再使用 Method的概念,故而直接用lambda形式的方法接口替代原来的“方法”,并命名为Action。 相应地,方法的处理,定于为 EventHandler (对应原版的SubscriberMethod )。 完整的 LightEventBus 实现,可参考:EventBus.kt

用法如下:

// 事件定义
data class NormalEvent(val time: String)

// 订阅方法列表
private val handlers: List<EventHandler<*>> = 
    listOf(
        // 订阅方法
        EventHandler.create<NormalEvent>(threadMode = ThreadMode.MAIN) { event ->
    Log.d( "TAG" , "event: ${event::class.simpleName} " )
        }
    )

// 订阅
EventBus.getDefault().register(handlers)

// 发送事件
EventBus.getDefault().post(NormalEvent(time)) 
 
 // 取消订阅
EventBus.getDefault().unregister(handlers)    
  • 事件的定义和事件的发布,和原版完全相同。
  • 订阅/取消订阅,和原版EventBus类似,不同之处仅在于,原版传入的订阅者(订阅方法所在的类),简化版传入的是方法列表。

4.4 性能对比

测试代码:Benchmark.kt 测试设备:Huawei P30 pro 测试方式:冷启动,记录首次结果(各阶段的耗时,时间单位:ms)

测试代码是由ksp生成,在 Benchmark.kt 中不方便查看各事件总线的调用,目录 example 中有单个事件的测试样例。 其中,订阅者都是直接继承Object的类,没有深层次的类继承,没有多余的方法(非订阅方法)。

分别进行两批测试,结果如下:

事件数量:100

方式准备注册发送取消注册
Index-EventBus14.94.13.10.4
Reflection-EventBus0.88.71.60.4
LiveEventBus0.67.11.11.7
RxBus17.43.54.40.3
LightEventBus0.60.41.00.2

事件数量:200

方式准备注册发送取消注册
Index-EventBus21.47.05.40.8
Reflection-EventBus1.210.43.40.6
LiveEventBus0.87.21.92.0
RxBus18.16.09.10.8
LightEventBus1.30.91.90.5

备注:

Index-EventBus是EventBus使用“订阅索引”下的测试结果; Reflection-EventBus是EventBus使用反射查找方法下的测试结果。

测试结果解析(推测,不一定完全符合):

  1. EventBus使用“订阅索引”,注册时比用反射快一些,但是准备阶段(执行addIndex)则相对耗时;
  2. 以上结果,EventBus的反射方式的发送比“订阅索引”的要快,大概是因为JIT的缘故,因为两者的反射是同一套代码; 如果只执行Reflection-EventBus而不执行Index-EventBus,则发送时间是差不多的。 这里我偷了一下懒,直接放在一起执行了。
  3. LiveEventBus的发送事件相对EventBus较快, 因为EventBus默认情况下使用“事件继承”,发送事件时会递归查找事件的父类和接口,然后遍历所有支持的类型,去检索订阅方法。
  4. RxBus准备阶段相对较慢,因为RxBus基于RxJava,准备阶段要加载大量的类,故而相对耗时. RxBus的发送相对较慢,一是要先线性遍历,二是遍历过程中要做类型判断。
  5. LightEventBus的发送默认不使用事件继承,所以发送速度也比EventBus快。 LightEventBus的注册阶段不需要查找方法,所以比EventBus要快很多。

五、总结

本文主要分析了EventBus的用法、设计、实现等各方面的内容。 同时简单分析和对比了其他事件总线的用法、实现、性能等内容。 希望这些整理对大家有所帮助。 文中如有表述不对之处,欢迎交流讨论。

转载自:https://juejin.cn/post/7379831020495749157
评论
请登录