likes
comments
collection
share

Spring Event (第二篇)

作者站长头像
站长
· 阅读数 22

Spring Event(第一篇),本篇继续延伸讲解。

一、Spring Playload Event

Spring Payload Event 接口(Spring 4.2版本之后才提供相关能力): org.springframework.context.PayloadApplicationEvent

  • 主要在Spring内部使用,用于简化Spring 事件发送,更加关注事件源的主体
  • 扩展 ApplicationEvent, 实现Spring 的泛型接口 ResolvableTypeProvider
  • 发送方法:ApplicationEventPublisher#publishEvent(java.lang.Object)
package org.springframework.context;

import org.springframework.core.ResolvableType;
import org.springframework.core.ResolvableTypeProvider;
import org.springframework.util.Assert;

/**
 * An {@link ApplicationEvent} that carries an arbitrary payload.
 *
 * <p>Mainly intended for internal use within the framework.
 *
 * @author Stephane Nicoll
 * @since 4.2
 * @param <T> the payload type of the event
 */
@SuppressWarnings("serial")
public class PayloadApplicationEvent<T> extends ApplicationEvent implements ResolvableTypeProvider {

   private final T payload;


   /**
    * Create a new PayloadApplicationEvent.
    * @param source the object on which the event initially occurred (never {@code null})
    * @param payload the payload object (never {@code null})
    */
   public PayloadApplicationEvent(Object source, T payload) {
      super(source);
      Assert.notNull(payload, "Payload must not be null");
      this.payload = payload;
   }


   @Override
   public ResolvableType getResolvableType() {
      return ResolvableType.forClassWithGenerics(getClass(), ResolvableType.forInstance(getPayload()));
   }

   /**
    * Return the payload of the event.
    */
   public T getPayload() {
      return this.payload;
   }

}

PayloadApplicationEvent示例:

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class ApplicationListenerDemo implements ApplicationEventPublisherAware {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        // 将引导类 ApplicationListenerDemo 作为 Configuration Class
        context.register(ApplicationListenerDemo.class);
        context.addApplicationListener(event -> println("ApplicationListener - 接收到 Spring 事件:" + event));
        // 启动 Spring 应用上下文
        context.refresh();
        // 关闭 Spring 应用上下文
        context.close();
    }
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        // 发送 PayloadApplicationEvent
        applicationEventPublisher.publishEvent("Hello,World");
        // 发送 自定义扩展的PayloadApplicationEvent
        applicationEventPublisher.publishEvent(new MyPayloadApplicationEvent(this, "Hello,World"));
    }

    static class MyPayloadApplicationEvent<String> extends PayloadApplicationEvent<String> {
        private static final long serialVersionUID = -2607517735941218378L;
        public MyPayloadApplicationEvent(Object source, String payload) {
            super(source, payload);
        }
    }
    private static void println(Object printable) {
        System.out.printf("[线程:%s] : %s\n", Thread.currentThread().getName(), printable);
    }
}

结果输出为:

[线程:main] : ApplicationListener - 接收到 Spring 事件:org.springframework.context.PayloadApplicationEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@439f5b3d, started on Thu Jan 19 09:25:26 CST 2023]
[线程:main] : ApplicationListener - 接收到 Spring 事件:org.geekbang.thinking.in.spring.event.ApplicationListenerDemo$MyPayloadApplicationEvent[source=org.geekbang.thinking.in.spring.event.ApplicationListenerDemo@57536d79]
[线程:main] : ApplicationListener - 接收到 Spring 事件:org.springframework.context.event.ContextRefreshedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@439f5b3d, started on Thu Jan 19 09:25:26 CST 2023]
[线程:main] : ApplicationListener - 接收到 Spring 事件:org.springframework.context.event.ContextClosedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@439f5b3d, started on Thu Jan 19 09:25:26 CST 2023]

结论:

  • PayloadApplicationEvent 主要用于Spring内部使用,一般无需自定义进行扩展
  • ApplicationEventPublisher#publishEvent(java.lang.Object) 直接体现了 PayloadApplicationEvent 的能力

二、自定义 Spring Event

  • 扩展ApplicationEvent
  • 实现ApplicationListener
  • 注册ApplicationListener
import org.springframework.context.ApplicationEvent;
public class MySpringEvent extends ApplicationEvent {
    private static final long serialVersionUID = -5690091601642311290L;
    public MySpringEvent(String message) {
        super(message);
    }
    @Override
    public String getSource() {
        return (String) super.getSource();
    }
}
import org.springframework.context.ApplicationListener;
public class MySpringEventListener implements ApplicationListener<MySpringEvent> {
    @Override
    public void onApplicationEvent(MySpringEvent event) {
        System.out.printf("[线程 : %s] 监听到事件 : %s\n", Thread.currentThread().getName(), event);
    }
}
import org.springframework.context.support.GenericApplicationContext;
public class CustomizedSpringEventDemo {

    public static void main(String[] args) {
        GenericApplicationContext context = new GenericApplicationContext();
        // 1.注册Spring 事件监听器
        context.addApplicationListener(new MySpringEventListener());
        // 2.启动 Spring 应用上下文
        context.refresh();
        // 3. 发布自定义 Spring 事件
        context.publishEvent(new MySpringEvent("Hello,World"));
        // 4. 关闭 Spring 应用上下文
        context.close();
    }
}

结果输出为:

[线程 : main] 监听到事件 : org.geekbang.thinking.in.spring.event.MySpringEvent[source=Hello,World]

三、ApplicationEventPublisher 和 ApplicationEventMulticaster

3.1、ApplicationEventPublisher 依赖注入的方式

  • 通过 ApplicationEventPublisherAware 回调接口
  • 通过 @Autowired ApplicationEventPublisher
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import javax.annotation.PostConstruct;
public class InjectingApplicationEventPublisherDemo implements ApplicationEventPublisherAware {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    @PostConstruct
    public void init() {
        applicationEventPublisher.publishEvent(new MySpringEvent("The event from @Autowired ApplicationEventPublisher"));
    }
    public static void main(String[] args) {
        // 创建注解驱动 Spring 应用上下文
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        // 注册 Configuration Class
        context.register(InjectingApplicationEventPublisherDemo.class);
        // 增加 Spring 事件监听器
        context.addApplicationListener(new MySpringEventListener());
        // 启动 Spring 应用上下文
        context.refresh();
        // 关闭 Spring 应用上下文
        context.close();
    }
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        applicationEventPublisher.publishEvent(new MySpringEvent("The event from ApplicationEventPublisherAware"));
    }
}

结果输出为:

[线程 : main] 监听到事件 : org.geekbang.thinking.in.spring.event.MySpringEvent[source=The event from ApplicationEventPublisherAware]
[线程 : main] 监听到事件 : org.geekbang.thinking.in.spring.event.MySpringEvent[source=The event from @Autowired ApplicationEventPublisher]

3.2、ApplicationEventPublisher 底层实现逻辑

  • 能力实现接口: org.springframework.context.event.ApplicationEventMulticaster
  • 实现抽象类: org.springframework.context.event.AbstractApplicationEventMulticaster
  • 具体实现类: org.springframework.context.event.SimpleApplicationEventMulticaster

3.3、ApplicationEventMulticaster 依赖查找的方式

  • 基于名称:“applicationEventMulticaster” 获取
  • 基于类型: org.springframework.context.event.ApplicationEventMulticaster 获取

四、同步和异步 Spring Event

4.1、基于实现类

实现类: org.springframework.context.event.SimpleApplicationEventMulticaster

模式切换: setTaskExecutor(java.util.concurrent.Executor) 方法

  • 默认模式: 同步
  • 异步模式: java.util.concurrent.ThreadPoolExecutor
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
public class AsyncEventHandlerDemo {
    public static void main(String[] args) {
        GenericApplicationContext context = new GenericApplicationContext();
        // 1.添加自定义 Spring 事件监听器
        context.addApplicationListener(new MySpringEventListener());
        // 2.启动 Spring 应用上下文
        context.refresh(); // 初始化 ApplicationEventMulticaster
        // 依赖查找 ApplicationEventMulticaster
        ApplicationEventMulticaster applicationEventMulticaster =
                context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
        // 判断当前 ApplicationEventMulticaster 是否为 SimpleApplicationEventMulticaster
        if (applicationEventMulticaster instanceof SimpleApplicationEventMulticaster) {
            SimpleApplicationEventMulticaster simpleApplicationEventMulticaster =
                    (SimpleApplicationEventMulticaster) applicationEventMulticaster;
            // 切换 taskExecutor
            ExecutorService taskExecutor = newSingleThreadExecutor(new CustomizableThreadFactory("my-spring-event-thread-pool"));
            // 同步 -> 异步
            simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor);
            // 添加 ContextClosedEvent 事件处理
            applicationEventMulticaster.addApplicationListener((ApplicationListener<ContextClosedEvent>) event -> {
                if (!taskExecutor.isShutdown()) {
                    taskExecutor.shutdown();
                }
            });
        }
        // 3. 发布自定义 Spring 事件
        context.publishEvent(new MySpringEvent("Hello,World"));
        // 4. 关闭 Spring 应用上下文(ContextClosedEvent)
        context.close();
    }
}

输出结果为:

[线程 : my-spring-event-thread-pool1] 监听到事件 : org.geekbang.thinking.in.spring.event.MySpringEvent[source=Hello,World]

4.2、基于注解

基于注解: @org.springframework.context.event.EventListener 模式切换

  • 默认模式: 同步
  • 异步模式: @org.springframework.scheduling.annotation.Async
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.Executor;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

@EnableAsync // 激活 Spring 异步特性
public class AnnotatedAsyncEventHandlerDemo {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        // 1. 注册当前类作为 Configuration Class
        context.register(AnnotatedAsyncEventHandlerDemo.class);
        // 2.启动 Spring 应用上下文
        context.refresh();
        // 3. 发布自定义 Spring 事件
        context.publishEvent(new MySpringEvent("Hello,World"));
        // 4. 关闭 Spring 应用上下文
        context.close();
    }
    @Async //同步 -> 异步
    @EventListener
    public void onEvent(MySpringEvent event) {
        System.out.printf("[线程 : %s] onEvent方法监听到事件 : %s\n", Thread.currentThread().getName(), event);
    }
    @Bean
    public Executor taskExecutor() {
        return newSingleThreadExecutor(new CustomizableThreadFactory("my-spring-event-thread-pool-a"));
    }
}

输出结果为:

[线程 : my-spring-event-thread-pool-a1] onEvent方法监听到事件 : org.geekbang.thinking.in.spring.event.MySpringEvent[source=Hello,World]

五、Spring Event 异常处理

错误处理接口: org.springframework.util.ErrorHandler

使用场景:

  • Spring Event: SimpleApplicationEventMulticaster Spring 4.1 开始支持
  • Spring Scheduling:
    • org.springframework.scheduling.concurrent.ConcurrentTaskScheduler
    • org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.GenericApplicationContext;

public class AsyncEventHandlerDemo {

    public static void main(String[] args) {
        GenericApplicationContext context = new GenericApplicationContext();
        // 1.添加自定义 Spring 事件监听器
        context.addApplicationListener(new MySpringEventListener());
        // 2.启动 Spring 应用上下文
        context.refresh();
        // 依赖查找 ApplicationEventMulticaster
        ApplicationEventMulticaster applicationEventMulticaster =
                context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
        // 判断当前 ApplicationEventMulticaster 是否为 SimpleApplicationEventMulticaster
        if (applicationEventMulticaster instanceof SimpleApplicationEventMulticaster) {
            SimpleApplicationEventMulticaster simpleApplicationEventMulticaster =
                    (SimpleApplicationEventMulticaster) applicationEventMulticaster;
            simpleApplicationEventMulticaster.setErrorHandler(e -> System.err.println("当 Spring 事件异常时,原因:" + e.getMessage()));
        }
        context.addApplicationListener((ApplicationListener<MySpringEvent>) event -> {
            throw new RuntimeException("故意抛出异常");
        });
        // 3. 发布自定义 Spring 事件
        context.publishEvent(new MySpringEvent("Hello,World"));
        // 4. 关闭 Spring 应用上下文(ContextClosedEvent)
        context.close();
    }
}

输出结果为:

[线程 : main] 监听到事件 : org.geekbang.thinking.in.spring.event.MySpringEvent[source=Hello,World]
Spring 事件异常时,原因:故意抛出异常

六、Spring 事件监听器实现原理

核心类: org.springframework.context.event.SimpleApplicationEventMulticaster

  • 设计模式:观察者模式扩展
    • 被观察者: org.springframework.context.ApplicationListener
    • 通知对象: org.springframework.context.ApplicationEvent
  • 执行模式:同步/异步
  • 异常处理:org.springframework.util.ErrorHandler
  • 泛型处理:org.springframework.core.ResolvableType
/*
 * Copyright 2002-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.context.event;

import java.util.concurrent.Executor;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.ResolvableType;
import org.springframework.lang.Nullable;
import org.springframework.util.ErrorHandler;

/**
 * Simple implementation of the {@link ApplicationEventMulticaster} interface.
 *
 * <p>Multicasts all events to all registered listeners, leaving it up to
 * the listeners to ignore events that they are not interested in.
 * Listeners will usually perform corresponding {@code instanceof}
 * checks on the passed-in event object.
 *
 * <p>By default, all listeners are invoked in the calling thread.
 * This allows the danger of a rogue listener blocking the entire application,
 * but adds minimal overhead. Specify an alternative task executor to have
 * listeners executed in different threads, for example from a thread pool.
 *
 * @author Rod Johnson
 * @author Juergen Hoeller
 * @author Stephane Nicoll
 * @see #setTaskExecutor
 */
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {

   @Nullable
   private Executor taskExecutor;

   @Nullable
   private ErrorHandler errorHandler;


   /**
    * Create a new SimpleApplicationEventMulticaster.
    */
   public SimpleApplicationEventMulticaster() {
   }

   /**
    * Create a new SimpleApplicationEventMulticaster for the given BeanFactory.
    */
   public SimpleApplicationEventMulticaster(BeanFactory beanFactory) {
      setBeanFactory(beanFactory);
   }


   /**
    * Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
    * to invoke each listener with.
    * <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
    * executing all listeners synchronously in the calling thread.
    * <p>Consider specifying an asynchronous task executor here to not block the
    * caller until all listeners have been executed. However, note that asynchronous
    * execution will not participate in the caller's thread context (class loader,
    * transaction association) unless the TaskExecutor explicitly supports this.
    * @see org.springframework.core.task.SyncTaskExecutor
    * @see org.springframework.core.task.SimpleAsyncTaskExecutor
    */
   public void setTaskExecutor(@Nullable Executor taskExecutor) {
      this.taskExecutor = taskExecutor;
   }

   /**
    * Return the current task executor for this multicaster.
    */
   @Nullable
   protected Executor getTaskExecutor() {
      return this.taskExecutor;
   }

   /**
    * Set the {@link ErrorHandler} to invoke in case an exception is thrown
    * from a listener.
    * <p>Default is none, with a listener exception stopping the current
    * multicast and getting propagated to the publisher of the current event.
    * If a {@linkplain #setTaskExecutor task executor} is specified, each
    * individual listener exception will get propagated to the executor but
    * won't necessarily stop execution of other listeners.
    * <p>Consider setting an {@link ErrorHandler} implementation that catches
    * and logs exceptions (a la
    * {@link org.springframework.scheduling.support.TaskUtils#LOG_AND_SUPPRESS_ERROR_HANDLER})
    * or an implementation that logs exceptions while nevertheless propagating them
    * (e.g. {@link org.springframework.scheduling.support.TaskUtils#LOG_AND_PROPAGATE_ERROR_HANDLER}).
    * @since 4.1
    */
   public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
      this.errorHandler = errorHandler;
   }

   /**
    * Return the current error handler for this multicaster.
    * @since 4.1
    */
   @Nullable
   protected ErrorHandler getErrorHandler() {
      return this.errorHandler;
   }


   @Override
   public void multicastEvent(ApplicationEvent event) {
      multicastEvent(event, resolveDefaultEventType(event));
   }

   @Override
   public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
      ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
      Executor executor = getTaskExecutor();
      for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
         if (executor != null) {
            executor.execute(() -> invokeListener(listener, event));
         }
         else {
            invokeListener(listener, event);
         }
      }
   }

   private ResolvableType resolveDefaultEventType(ApplicationEvent event) {
      return ResolvableType.forInstance(event);
   }

   /**
    * Invoke the given listener with the given event.
    * @param listener the ApplicationListener to invoke
    * @param event the current event to propagate
    * @since 4.1
    */
   protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
      ErrorHandler errorHandler = getErrorHandler();
      if (errorHandler != null) {
         try {
            doInvokeListener(listener, event);
         }
         catch (Throwable err) {
            errorHandler.handleError(err);
         }
      }
      else {
         doInvokeListener(listener, event);
      }
   }

   @SuppressWarnings({"rawtypes", "unchecked"})
   private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
      try {
         listener.onApplicationEvent(event);
      }
      catch (ClassCastException ex) {
         String msg = ex.getMessage();
         if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
            // Possibly a lambda-defined listener which we could not resolve the generic event type for
            // -> let's suppress the exception and just log a debug message.
            Log logger = LogFactory.getLog(getClass());
            if (logger.isTraceEnabled()) {
               logger.trace("Non-matching event type for listener: " + listener, ex);
            }
         }
         else {
            throw ex;
         }
      }
   }

   private boolean matchesClassCastMessage(String classCastMessage, Class<?> eventClass) {
      // On Java 8, the message starts with the class name: "java.lang.String cannot be cast..."
      if (classCastMessage.startsWith(eventClass.getName())) {
         return true;
      }
      // On Java 11, the message starts with "class ..." a.k.a. Class.toString()
      if (classCastMessage.startsWith(eventClass.toString())) {
         return true;
      }
      // On Java 9, the message used to contain the module name: "java.base/java.lang.String cannot be cast..."
      int moduleSeparatorIndex = classCastMessage.indexOf('/');
      if (moduleSeparatorIndex != -1 && classCastMessage.startsWith(eventClass.getName(), moduleSeparatorIndex + 1)) {
         return true;
      }
      // Assuming an unrelated class cast failure...
      return false;
   }

}

七、总结

通过前后两篇文章把 Spring Event 的背景、特性、应用 做了一个全面的梳理。

Spring Event 相关的能力最终都聚焦到 SimpleApplicationEventMulticaster 来对外提供。

  • Spring 应用上下文 AbstractApplicationContext 就是引入 SimpleApplicationEventMulticaster 对事件进行统一处理。
  • 大家如果要扩展 Spring Event 的能力,可以继承 SimpleApplicationEventMulticaster 来增强。