likes
comments
collection
share

RocketMQ——RocketMQ搭建及问题解决

作者站长头像
站长
· 阅读数 9

RocketMQ简介

RocketMQ是一个开源的分布式消息和流数据平台,由阿里研发目前属于apache顶级项目:rocketmq.apache.org/ RocketMQ消息队列主要功能功能:引用解耦、流量消峰、消息分发、保证最终一致性、方便动态扩容

RocketMQ安装

下载地址

RocketMQ 4.5.1

系统要求

64位 Linux、unix或mac;
JDK版本1.8以上;

准备工作

由于下载的是zip压缩格式文件,因此在linux上安装unzip来进行解压

yum install -y unzip

使用unzip命令解压

unzip rocketmq-all-4.5.1-bin-release.zip

单机模式安装

启动nameserver

两种启动方式

  1. 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入./mqnamesrv进行启动
[root@node1 bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

命令行提示启动成功

  1. 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入指令nohup sh bin/mqnamesrv &
[root@node1 bin]# nohup sh ./mqnamesrv &
[1] 68705

命令行显示启动的进程号,查看启动日志

namesrv.log 
...
The Name Server boot success. serializeType=JSON

日志提示启动成功

启动broker

两种启动方式:

  1. 直接运行mqbroker
./mqbroker -n localhost:9876
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876

或者

./mqbroker
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON

提示启动成功

  1. 通过nohup指令启动
nohup sh ./mqbroker -n localhost:9876 &
[2] 69248

查看broker启动日志

tail -f ~/logs/rocketmqlogs/broker.log
...
INFO main - The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876

提示启动成功

测试

运行源文件中以写好的测试demo

export NAMESRV_ADDR=localhost:9876
# 生产者
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=AC110001104B2B193F2D6EA72CC003E7...
# 消费者
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=3, storeSize=179...

单点消息队列启动成功

关闭消息队列

通过mqshutdown命令关闭消息队列,依次关闭nameserver和broker

sh mqshutdown broker
The mqbroker(69255) is running...
Send shutdown request to mqbroker(69255) OK
sh mqshutdown namesrv
The mqnamesrv(68711) is running...
Send shutdown request to mqnamesrv(68711) OK

问题

broker内存不够

Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /component/rocketmq-all-4.5.1-bin-release/bin/hs_err_pid68880.log

解决方案

报错原因是虚拟机启动内存不够,修改bin下的服务启动脚本 runserver.sh 、runbroker.sh 中对于内存的限制,

runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

改成如下示例:

runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"

集群搭建

通过两台虚拟机(192.168.108.128和192.168.108.129)模拟集群

启动NameServer

分别启动两台虚拟机的NameServer,启动方式与单机模式相同

启动broker

两台虚拟机的broker互为主备,即各自启动一个Master和一个Slave,在启动broker之前需要修改相关配置文件

修改配置文件

在目录conf/2m-2s-sync中(两主两从,同步更新),该目录下有四个配置文件,分别对应两个Master和两个Slave配置

├── broker-a.properties		#Master-a配置
├── broker-a-s.properties	#Slave-a配置
├── broker-b.properties		#Master-b配置
└── broker-b-s.properties	#Slave-b配置

在这里我们需要修改192.168.108.128的broker-a.properties和broker-b-s.properties,即128这台机器是名字为broker-a的broker的主节点,是名称为broker-b的broker的从节点;相对应的192.168.108.129是名字为broker-a的broker的从节点,是名称为broker-b的broker的主节点,因此修改129机器的broker-b.properties和broker-a-s.properties配置文件。修改如下:

修改机器1的配置文件(192.168.108.128)

master:broker-a.properties

namesrvAddr=192.168.108.128:9876;192.168.108.129:9876	#配置nameserver地址
listenPort=10911									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-a									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER								 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							 #刷盘策略

slave:broker-b-s.properties

namesrvAddr=192.168.108.128:9876;192.168.108.129:9876	#配置nameserver地址
listenPort=11011									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-b									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE									 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							 #刷盘策略
修改机器2的配置文件(192.168.108.129)

master:broker-b.properties

namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876	#配置nameserver地址	
listenPort=10911									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-b									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER								 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							 #刷盘策略

slave:broker-a-s.properties

namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876	#配置nameserver地址	
listenPort=11011									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-a									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE									 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							  #刷盘策略

启动四个broker


nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b-s.properties & 

nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a-s.properties & 
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.108.128:10911
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.108.128:10911
2019-07-25 14:00:20 INFO brokerOutApi_thread_2 - register broker[0]to name server 192.168.108.129:9876 OK
2019-07-25 14:00:20 INFO brokerOutApi_thread_1 - register broker[0]to name server 192.168.108.128:9876 OK

名两行出现broker注册到nameserver成功,则集群搭建成功

问题

Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.2:10911> failed

解决方案

出现该问题是因为虚拟机中搭建了docker产生了虚拟网卡过多,RocketMQ在访问docker网络时不通产生问题。 首先关闭docker服务

systemctl stop docker
systemctl is-enabled  docker         #查询是否自启动
systemctl disable  docker            #禁止自启动

然后重启虚拟机即可解决

测试

通过java代码对集群进行测试

生产者代码:

public class producer {
    public static void main(String[] args) throws Exception {
    	//创建生产者,并设置group
        DefaultMQProducer producer = new DefaultMQProducer("group");
        //设置nameserver
        producer.setNamesrvAddr("192.168.108.128:9876");
        //启动生产者
        producer.start();
        //创建发送的消息,发送100条数据
        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest", "TagA",("MQ Test"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送
            SendResult result = producer.send(msg);
            System.out.println(result);
        }
		//关闭生产者
        producer.shutdown();
    }
}

消费者代码

public class consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
        consumer.setNamesrvAddr("192.168.108.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.println(Thread.currentThread().getName() + " Receive Message: " + list);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        System.out.println("consumer start...");
        consumer.start();
    }
}

测试结果

生产者 RocketMQ——RocketMQ搭建及问题解决 消费者 RocketMQ——RocketMQ搭建及问题解决

从控制台可以看到,在消费者端显示消息发送成功的数据,sendStatus均为ok,消费者端显示读取出来对应的消息,集群搭建成功

可视化管理平台

RocketMQ拥有一个可视化的管理平台,可以通过图形界面的方式查看集群情况、topic、生产者、消费者等具体信息 地址: RocketMQ扩展,项目拉下来后进入目录rocketmq-console 项目使用springboot搭建,在配置文件application.properties中修改配置rocketmq.config.namesrvAddr

rocketmq.config.namesrvAddr=192.168.108.128:9876;192.168.108.129:9876

启动项目,运行App.java。访问localhost:8080即可进入管理平台

RocketMQ——RocketMQ搭建及问题解决 RocketMQ——RocketMQ搭建及问题解决 RocketMQ——RocketMQ搭建及问题解决