likes
comments
collection
share

SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

作者站长头像
站长
· 阅读数 17

开启掘金成长之旅!这是我参与「掘金日新计划 · 12 月更文挑战」的第8天,点击查看活动详情

前言

我们在进行分布式项目开发时,如何在多个项目之间通信呢?这是一个很让人头疼的问题!分布式项目可能是由多个子项目来组成,而且不同的子项目可能还部署在不同的机器上,我们没办法通过方法调用的方式来进行信息的传递。这时候怎么办?

现在其实有很多的通信技术,比如RPC、MQ等,其中MQ消息队列就是一种很高效的常用通信工具。

所以在这上一章节中,壹哥 会给大家讲解在Spring Boot中整合ActiveMQ,实现消息的发送和接收。

一. ActiveMQ简介

接下来我们先来了解一下ActiveMQ,看看什么是ActiveMQ,以及ActiveMQ有哪些特点。

1. ActiveMQ概述

Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.

Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, 
comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4.

ActiveMQ是由Apache出品的,一款流行的,能力强劲的开源消息总线,ActiveMQ实现了JMS消息协议。ActiveMQ是一个完全支持JMS 1.1 和 J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

2. ActiveMQ特性

1️⃣. 多种语言和协议编写客户端;

语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。

应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP;

2️⃣. 完全支持JMS1.1和J2EE 1.4规范(持久化,XA消息,事务);

3️⃣. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性;

4️⃣. 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上;

5️⃣. 支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA;

6️⃣. 支持通过JDBC和journal提供高速的消息持久化;

7️⃣. 从设计上保证了高性能的集群,客户端-服务器,点对点;

8️⃣. 支持Ajax;

9️⃣. 支持与Axis的整合;

🔟. 可以很容易得调用内嵌JMS provider,进行测试。

3. ActiveMQ的使用场景

1️⃣. 多个项目之间集成 (1) 跨平台; (2) 多语言; (3) 多项目。

2️⃣. 降低系统间模块的耦合度,解耦;

3️⃣. 流量削峰。

4. ActiveMQ的消息传递模式

SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

P2P (点对点)消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。
Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。
发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。

二. 发送消息的实现

了解完基本的理论之后,我们就来实操一把,实现消息的发送和接收。

1. 创建Web项目

首先按照之前的经验,我们创建一个SpringBoot的Web程序,具体过程略。

SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

2. 添加依赖包

在pom.xml文件中添加核心依赖包,如下所示。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

3. 创建application.yml配置文件

创建一个application.yml文件,在这里配置自己的ActiveMQ消息队列的信息。

#配置activemq
spring:
  activemq:
    #activemq的url
    broker-url: tcp://127.0.0.1:61616
    #用户名
    user: admin
    #密码
    password: admin
    pool:
      enabled: false #是否使用线程池
      max-connections: 100 #最大连接数
    #是否信任所有包
    packages:
      trust-all: true
  #默认情况下,activemq使用的是queue模式,如果要使用topic模式,必须设置为true
  jms:
    pub-sub-domain: true

4. 创建ActiveMQ的配置类

接着我们创建一个ActiveMQ的配置类,在这个类中配置连接工厂,消息监听器、消息队列等。

package com.yyg.boot.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * @Author 一一哥Sun
 * @Date Created in 2020/4/14
 * @Description Description
 * //@EnableJms启用jms功能
 */
@Configuration
@EnableJms
public class ActivemqConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
        connectionFactory.setUserName(env.getProperty("spring.activemq.user"));
        connectionFactory.setPassword(env.getProperty("spring.activemq.password"));
        return connectionFactory;
    }

    /**
     * 实现监听queue
     */
    @Bean("jmsQueueListenerContainerFactory")
    public JmsListenerContainerFactory<?> queueContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //开启接收topic类型的消息
        factory.setPubSubDomain(false);
        return factory;
    }

    /**
     * 实现监听topic
     */
    @Bean("jmsTopicListenerContainerFactory")
    public JmsListenerContainerFactory<?> topicContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }

    /**
     * 队列名称
     */
    @Bean("springboot.queue")
    public Queue queue() {
        return new ActiveMQQueue("springboot.queue") ;
    }

    /**
     * Topic名称
     */
    @Bean("springboot.topic")
    public Topic topic() {
        return new ActiveMQTopic("springboot.topic") ;
    }

}

5. 创建消息生产者的工具类

为了方便消息的发送,我创建一个消息发送者方面的工具类,在这个Producer类中,创建了几个发送消息的方法。

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * @Author 一一哥Sun
 * @Date Created in 2020/4/15
 * @Description 消息生产者
 */
@Slf4j
@Component
public class Producer {

    @Resource(name = "springboot.queue")
    private Queue queue;

    @Resource(name = "springboot.topic")
    private Topic topic;

    @Resource(name = "springboot.replyQueue")
    private Queue replyQueue;

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    /**
     * 发送消息,destination是发送到的目标队列,message是待发送的消息内容;
     */
    public void sendMessage(Destination destination, final String message) {
        jmsTemplate.convertAndSend(destination, message);
    }

    /**
     * 发送队列消息
     */
    public void sendQueueMessage(final String message) {
        sendMessage(queue, message);
    }

    /**
     * 发送Topic消息
     */
    public void sendTopicMessage(final String message) {
        sendMessage(topic, message);
    }

}

6. 定义消费消息的Consumer类

接下来还有创建一个用于接收消息,也就是消费消息的Consumer类。

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * @Author 一一哥Sun
 * @Date Created in 2020/4/15
 * @Description 消息的消费者
 */
@Slf4j
@Component
public class Consumer {

    /**
     * 监听Queue队列,queue类型
     */
    @JmsListener(destination="springboot.queue",
            containerFactory = "jmsQueueListenerContainerFactory")
    public void receiveQueue(String text){
        log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
    }

    /**
     * 监听Topic队列,topic类型,这里containerFactory要配置为jmsTopicListenerContainerFactory
     */
    @JmsListener(destination = "springboot.topic",
            containerFactory = "jmsTopicListenerContainerFactory")
    public void receiveTopic(String text) {
        log.warn(this.getClass().getName()+"-->收到的报文为:"+text);
    }
    
}

7. 创建Controller,发布消息

创建一个Controller类,定义2个发送消息的URL接口,分别按照点对点模式,发布者订阅者模式来发送消息。

package com.yyg.boot.web;

import com.yyg.boot.domain.User;
import com.yyg.boot.jms.Consumer;
import com.yyg.boot.jms.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author 一一哥Sun
 * @Date Created in 2020/4/15
 * @Description Description
 */
@RestController
public class MsgController {

    @Autowired
    private Producer producer;

    @Autowired
    private Consumer consumer;

    @GetMapping("/sendQueue")
    public String sendQueueMsg() {
        User user = new User();
        user.setId(1L);
        user.setUsername("一一哥Queue");
        user.setPassword("123");
        producer.sendQueueMessage(user.toString());
        return "发送成功!";
    }

    @GetMapping("/sendTopic")
    public String sendTopicMsg() {
        User user = new User();
        user.setId(2L);
        user.setUsername("一一哥Topic");
        user.setPassword("123456");
        producer.sendTopicMessage(user.toString());
        return "发送成功!";
    }

}

8. 创建入口类

最后创建一个入口类,启动项目。

package com.yyg.boot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ActiveMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActiveMQApplication.class, args);
    }

}

9. 项目代码结构

完整的项目结构如下图所示,各位可以参考创建。

SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

10. 启动项目进行测试

10.1 测试发送点对点类型的消息 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

这时候我们在队列中,可以看到成功的收到了消息。 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送 在ActiveMQ中也可以看到出现了springboot.queue队列,并且队列中的消息已被消费掉。 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

10.2 测试发送发布者订阅者类型的消息 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

这时候我们在Topic中,可以看到成功的收到了消息。 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

在ActiveMQ中也可以看到出现了springboot.topic队列,并且队列中的消息已被消费掉。 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

三. 回复消息的实现

我们在上面的基础之上,进一步实现消息回复的功能。

1. 改造ActivemqConfig类

在该类中添加一个用来接收消息的队列。

/**
* 回复队列名称
*/
@Bean("springboot.replyQueue")
public Queue queueReply() {
    return new ActiveMQQueue("springboot.replyQueue") ;
}

此时完整的ActivemqConfig代码如下所示:

package com.yyg.boot.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * @Author 一一哥Sun
 * @Date Created in 2020/4/14
 * @Description Description
 * //@EnableJms启用jms功能
 */
@Configuration
@EnableJms
public class ActivemqConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
        connectionFactory.setUserName(env.getProperty("spring.activemq.user"));
        connectionFactory.setPassword(env.getProperty("spring.activemq.password"));
        return connectionFactory;
    }

    /**
     * 实现监听queue
     */
    @Bean("jmsQueueListenerContainerFactory")
    public JmsListenerContainerFactory<?> queueContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //开启接收topic类型的消息
        factory.setPubSubDomain(false);
        return factory;
    }

    /**
     * 实现监听topic
     */
    @Bean("jmsTopicListenerContainerFactory")
    public JmsListenerContainerFactory<?> topicContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }

    /**
     * 队列名称
     */
    @Bean("springboot.queue")
    public Queue queue() {
        return new ActiveMQQueue("springboot.queue") ;
    }

    /**
     * Topic名称
     */
    @Bean("springboot.topic")
    public Topic topic() {
        return new ActiveMQTopic("springboot.topic") ;
    }

    /**
     * 回复队列名称
     */
    @Bean("springboot.replyQueue")
    public Queue queueReply() {
        return new ActiveMQQueue("springboot.replyQueue") ;
    }

}

2. 改造Producer类

在Producer类中定义一个新的Queue类,并定义发送消息和消费消息的方法。

@Resource(name = "springboot.replyQueue")
private Queue replyQueue;

/**
* 发送队列的回复消息
*/
public void sendQueueMessageReply(String message) {
    sendMessage(replyQueue, message);
}

/**
* 生产者监听消费者的应答信息
*/
@JmsListener(destination = "replyTo.queue",containerFactory = "jmsQueueListenerContainerFactory")
public void consumerMessage(final String text) {
    log.warn("从replyTo.queue队列中收到的应答报文为:" + text);
}

此时完整的Producer类代码如下所示:

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * @Author 一一哥Sun
 * @Date Created in 2020/4/15
 * @Description 消息生产者
 */
@Slf4j
@Component
public class Producer {

    @Resource(name = "springboot.queue")
    private Queue queue;

    @Resource(name = "springboot.topic")
    private Topic topic;

    @Resource(name = "springboot.replyQueue")
    private Queue replyQueue;

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    /**
     * 发送消息,destination是发送到的目标队列,message是待发送的消息内容;
     */
    public void sendMessage(Destination destination, final String message) {
        jmsTemplate.convertAndSend(destination, message);
    }

    /**
     * 发送队列消息
     */
    public void sendQueueMessage(final String message) {
        sendMessage(queue, message);
    }

    /**
     * 发送Topic消息
     */
    public void sendTopicMessage(final String message) {
        sendMessage(topic, message);
    }

    /**
     * 发送队列的回复消息
     */
    public void sendQueueMessageReply(String message) {
        sendMessage(replyQueue, message);
    }

    /**
     * 生产者监听消费者的应答信息
     */
    @JmsListener(destination = "replyTo.queue",containerFactory = "jmsQueueListenerContainerFactory")
    public void consumerMessage(final String text) {
        log.warn("从replyTo.queue队列中收到的应答报文为:" + text);
    }
    
}

3. 改造Consumer类

在该类中添加接收消息,并且设置回复消息的方法。

/**
* 回复给生产者的应答信息
*/
@JmsListener(destination="springboot.replyQueue",containerFactory = "jmsQueueListenerContainerFactory")
@SendTo("replyTo.queue") //消费者应答后通知生产者
public String receiveQueueReply(String text){
    log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
    return "回复的信息为-->"+text;
}

此时完整的Consumer类代码如下所示:

package com.yyg.boot.jms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * @Author 一一哥Sun
 * @Date Created in 2020/4/15
 * @Description 消息的消费者
 */
@Slf4j
@Component
public class Consumer {

    /**
     * 监听Queue队列,queue类型
     */
    @JmsListener(destination="springboot.queue",
            containerFactory = "jmsQueueListenerContainerFactory")
    public void receiveQueue(String text){
        log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
    }

    /**
     * 监听Topic队列,topic类型,这里containerFactory要配置为jmsTopicListenerContainerFactory
     */
    @JmsListener(destination = "springboot.topic",
            containerFactory = "jmsTopicListenerContainerFactory")
    public void receiveTopic(String text) {
        log.warn(this.getClass().getName()+"-->收到的报文为:"+text);
    }

    /**
     * 回复给生产者的应答信息
     */
    @JmsListener(destination="springboot.replyQueue",containerFactory = "jmsQueueListenerContainerFactory")
    @SendTo("replyTo.queue") //消费者应答后通知生产者
    public String receiveQueueReply(String text){
        log.warn(this.getClass().getName()+ "-->收到的报文为:"+text);
        return "回复的信息为-->"+text;
    }

}

4. 重新运行,测试消息的回复功能

调用如下接口,测试消息回复功能。 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

此时可以看到控制台输出如下信息,说明消息回复成功。 SpringBoot2.x系列教程34--SpringBoot整合ActiveMQ实现消息发送

结语

至此,我们实现了Spring Boot中如何整合ActiveMQ,实际上我们完全可以把消息的发送和消息的接收代码写在两个完全不同的项目中,依然可以实现消息的收发。

今日小作业:

在学生管理系统中,添加一个学生注册功能,在注册成功后,发送一个注册成功的消息,提示用户进行激活操作。

转载自:https://juejin.cn/post/7170834679669620749
评论
请登录