likes
comments
collection
share

Java奇技淫巧: 观察者模式实现事件发布和监听

作者站长头像
站长
· 阅读数 3

Java奇技淫巧: 观察者模式实现事件发布和监听 我的博客: 夜航猩

前言

事件引擎的作用就是来管理各种事件.其核心内容包含事件监听,事件发布, 事件监听器注册.为什么要用到事件引擎呢?举个例子, 主角小明是某公司的后端开发,他负责系统内用户的一些行为数据埋点及统计,没用使用事件引擎之前,当需要对N个事件进行数据统计, 他需要对各个事件点位都进行编码实现,小明觉得这样的代码可读性不高,重复编码过多,且溯源麻烦.于是看了我的文章之后,回去手动实现了一个事件引擎,来管理各点位的统计业务.从代码层面进行了业务解耦,并且使代码更容易理解.那具体怎么做呢? 我们一起来手动实现一个简单实用的事件引擎

前言说到事件引擎的核心内容主要有 事件监听,事件发布, 事件监听器注册.接下来我们对三个核心部分分别进行实现,并在后面进行验证

事件监听器

其作用就是为了监听发布的事件, 假设需求为统计用户登录次数 ,那么我们就可以把登录看做一个事件, 统计次数则在监听器中实现,

为了更好的拓展, 我定义了一个接口和一个抽象类, 其中抽象类继承了接口, 用户需要实现自己的监听器,可以集成该抽象类对事件监听内容进行处理, 代码如下

Event ,首先需要定义一个事件基类,后面会说

/**
 * the event base class
 *
 * @author Anker
 */
public class Event {

    /**
     * sync or async
     */
    private boolean sync = true;

    public boolean isSync() {
        return sync;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }
}

EventListener.java 事件监听接口, 其定义了两个核心方法 listen() 和 triggerEvent()

/**
 * event listener
 *
 * @author Anker
 */
public interface EventListener {

    /**
     * 监听事件
     */
   void listen();

    /**
     * 触发事件
     *
     * @param event
     */
   void triggerEvent(Object event);
}

AbstractEventListener.java 事件监听抽象类 可以看到,该抽象类使用了泛型,之所以使用泛型,是考虑到当我继承该抽象类并实现时, 我的入参为我指定的入参类型,不需要再进行二次转换

/**
 * abstract event listener
 *
 * @author Anker
 * @param <E> event type
 */
public abstract class AbstractEventListener<T extends Event> implements EventListener {


    @Override
    public void listen() {
        EventHandlerHolder.bind(eventType(), this);
    }

    @Override
    public void triggerEvent(Object event) {
        onEvent((T) event);
    }

    /**
     * 事件类型
     *
     * @return
     */
    public abstract Class<? extends Event> eventType();

    /**
     * 执行触发事件时逻辑
     *
     * @param event 事件类型
     */
    public abstract void onEvent(T event);

}

该抽象类实现了接口的两个方法, listen() 的作用是将当前监听器与事件绑定,相当于事件监听器注册,triggerEvent()当该事件被发布时, 触发该事件,进行事件监听 然后又提供了两个抽象方法, eventType() 为当前事件监听器指定监听的事件类型, onEvent() 事件监听实际消费的方法

事件监听器注册

当用户继承了 AbstractEventListener 后, 我们需要将用户的事件监听器注册到某个容器中,以供事件消费时使用

监听器容器

EventListenerHolder.java, 事件监听器持有者

public class EventListenerHolder {

    /**
     * 事件处理器map
     */
    private static final Map<Class<? extends Event>, List<EventListener>> EVENT_LISTENERS = new ConcurrentHashMap<>(128);

    /**
     * 注册事件处理器
     *
     * @param eventType     事件类型,标识
     * @param listener      监听器
     */
    public static void bind(Class<? extends Event> eventType, EventListener listener) {
        if (!EVENT_LISTENERS.containsKey(eventType)){
            List<EventListener> eventListeners = CollUtil.newArrayList(listener);
            EVENT_LISTENERS.put(eventType, eventListeners);
        } else {
            EVENT_LISTENERS.get(eventType).add(listener);
        }
    }

    /**
     * 获取事件处理器
     *
     * @param eventId
     * @return
     */
    public static List<EventListener> getListener(Class<? extends Event> eventId) {
        return EVENT_LISTENERS.get(eventId);
    }

    /**
     * 清除事件处理器
     */
    public static void clear() {
        EVENT_LISTENERS.clear();
    }

}

其中初始化了一个Map来作为监听器注册的容器, key 为 事件类型, value 为 List<EventListener> 该事件对应的一批监听器

将监听器注册进容器中

定义好容器之后, 我们需要在Spring实例化监听器后,将实例化之后的监听器注册进容器,我们需要实现BeanPostProcessor 接口, 并重写 postProcessAfterInitialization 方法, 其作用就是在 bean 被初始化之后进行一些后置操作, 此时我们需要判断该bean是否实现了 EventListener, 如果是, 则执行其listen()方法, 将事件与监听器进行绑定

public class ListenerScanner implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof EventListener listener){
            listener.listen();
        }
        return bean;
    }
}

事件发布

实现了监听器和监听器注册, 我们接着来实现事件发布, 很好理解, 触发的事件需要一个入口来定义和触发监听,这个东西则为事件发布器, 其具体实现如下

/**
 * 默认事件发布处理器
 *
 * @author Anker
 */
public class DefaultEventPublisherProcess implements EventPublisher {

    private final AsyncEventProcessor asyncEventProcessor;

    public DefaultEventPublisherProcess(AsyncEventProcessor asyncEventProcessor) {
        this.asyncEventProcessor = asyncEventProcessor;
    }

   @Override
    public <T extends Event> void publish(T event) {
        List<EventListener> listeners = EventListenerHolder.getListener(event.getClass());
        if (CollUtil.isEmpty(listeners)) {
            return;
        }
        try {
            for (EventListener listener : listeners) {
                listener.triggerEvent(event);
            }
        } catch (Exception e) {
            throw new EventConsumerException(e);
        }
    }
}

是不是很简单, 可以看到其执行流程, 当时间发布后, 先去容器中查询是否存在其绑定的事件监听器, 如果没有, 则终止, 否则对事件进行广播

测试(如何使用)

定义事件类型

public class TriggerEvent extends Event{

    private String name;

    public TriggerEvent(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    @Override
    public boolean isSync() {
        return false;
    }
}

定义事件监听器

@Component
public class TriggerEventListener extends AbstractEventListener<TriggerEvent> {


    @Override
    public void onEvent(TriggerEvent event) {
        System.out.println(event.getName() + "1111");
    }

    @Override
    public Class<TriggerEvent> eventType() {
        return TriggerEvent.class;
    }
}

执行事件发布

@RestController
@RequiredArgsConstructor
@RequestMapping("demo")
public class DemoController {


    private final EventPublisher eventPublisher;



    @GetMapping("/trigger")
    public void trigger(){
        eventPublisher.publish(new TriggerEvent("触发事件"));
        System.out.println("接口被触发了");
    }

}

结果验证

接口被触发了
触发事件1111

总结

以上代码还可以进行更多优化, 比如实现异步事件消费, 甚至可以进行分布式改造, 但是核心内容就这三点. 其应用场景很多, 文中仅为示例.最好自行手动实现一遍,好帮助你进行理解. 文中不足的地方请指出,互相学习,互相交流.但是你喷我, 我也得喷你. 文明交流哦,