网站首页 > 博客文章 正文
RocketMQ简介
RocketMQ是一个开源的分布式消息和流数据平台,由阿里研发目前属于apache顶级项目:rocketmq.apache.org/ RocketMQ消息队列主要功能功能:引用解耦、流量消峰、消息分发、保证最终一致性、方便动态扩容
RocketMQ安装
下载地址
RocketMQ 4.5.1
系统要求
64位 Linux、unix或mac;
JDK版本1.8以上;
准备工作
由于下载的是zip压缩格式文件,因此在linux上安装unzip来进行解压
yum install -y unzip
使用unzip命令解压
unzip rocketmq-all-4.5.1-bin-release.zip
单机模式安装
启动nameserver
两种启动方式
- 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入./mqnamesrv进行启动
[root@node1 bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
命令行提示启动成功
- 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入指令nohup sh bin/mqnamesrv &
[root@node1 bin]# nohup sh ./mqnamesrv &
[1] 68705
命令行显示启动的进程号,查看启动日志
namesrv.log
...
The Name Server boot success. serializeType=JSON
日志提示启动成功
启动broker
两种启动方式:
- 直接运行mqbroker
./mqbroker -n localhost:9876
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876
或者
./mqbroker
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON
提示启动成功
- 通过nohup指令启动
nohup sh ./mqbroker -n localhost:9876 &
[2] 69248
查看broker启动日志
tail -f ~/logs/rocketmqlogs/broker.log
...
INFO main - The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:987
提示启动成功
测试
运行源文件中以写好的测试demo
export NAMESRV_ADDR=localhost:9876
# 生产者
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=AC110001104B2B193F2D6EA72CC003E7...
# 消费者
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=3, storeSize=179...
单点消息队列启动成功
关闭消息队列
通过mqshutdown命令关闭消息队列,依次关闭nameserver和broker
sh mqshutdown broker
The mqbroker(69255) is running...
Send shutdown request to mqbroker(69255) OK
sh mqshutdown namesrv
The mqnamesrv(68711) is running...
Send shutdown request to mqnamesrv(68711) OK
问题
broker内存不够
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /component/rocketmq-all-4.5.1-bin-release/bin/hs_err_pid68880.log
解决方案
报错原因是虚拟机启动内存不够,修改bin下的服务启动脚本 runserver.sh 、runbroker.sh 中对于内存的限制,
runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
改成如下示例:
runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"
集群搭建
通过两台虚拟机(192.168.108.128和192.168.108.129)模拟集群
启动NameServer
分别启动两台虚拟机的NameServer,启动方式与单机模式相同
启动broker
两台虚拟机的broker互为主备,即各自启动一个Master和一个Slave,在启动broker之前需要修改相关配置文件
修改配置文件
在目录conf/2m-2s-sync中(两主两从,同步更新),该目录下有四个配置文件,分别对应两个Master和两个Slave配置
├── broker-a.properties #Master-a配置
├── broker-a-s.properties #Slave-a配置
├── broker-b.properties #Master-b配置
└── broker-b-s.properties #Slave-b配置
在这里我们需要修改192.168.108.128的broker-a.properties和broker-b-s.properties,即128这台机器是名字为broker-a的broker的主节点,是名称为broker-b的broker的从节点;相对应的192.168.108.129是名字为broker-a的broker的从节点,是名称为broker-b的broker的主节点,因此修改129机器的broker-b.properties和broker-a-s.properties配置文件。修改如下:
修改机器1的配置文件(192.168.108.128)
master:broker-a.properties
namesrvAddr=192.168.108.128:9876;192.168.108.129:9876 #配置nameserver地址
listenPort=10911 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-a #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
slave:broker-b-s.properties
namesrvAddr=192.168.108.128:9876;192.168.108.129:9876 #配置nameserver地址
listenPort=11011 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-b #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
修改机器2的配置文件(192.168.108.129)
master:broker-b.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址
listenPort=10911 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-b #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
slave:broker-a-s.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址
listenPort=11011 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-a #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
启动四个broker
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b-s.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a-s.properties &
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.108.128:10911
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.108.128:10911
2019-07-25 14:00:20 INFO brokerOutApi_thread_2 - register broker[0]to name server 192.168.108.129:9876 OK
2019-07-25 14:00:20 INFO brokerOutApi_thread_1 - register broker[0]to name server 192.168.108.128:9876 OK
名两行出现broker注册到nameserver成功,则集群搭建成功
问题
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.2:10911> failed
解决方案
出现该问题是因为虚拟机中搭建了docker产生了虚拟网卡过多,RocketMQ在访问docker网络时不通产生问题。 首先关闭docker服务
systemctl stop docker
systemctl is-enabled docker #查询是否自启动
systemctl disable docker #禁止自启动
然后重启虚拟机即可解决
测试
通过java代码对集群进行测试
生产者代码:
public class producer {
public static void main(String[] args) throws Exception {
//创建生产者,并设置group
DefaultMQProducer producer = new DefaultMQProducer("group");
//设置nameserver
producer.setNamesrvAddr("192.168.108.128:9876");
//启动生产者
producer.start();
//创建发送的消息,发送100条数据
for (int i = 0; i < 100; i++){
Message msg = new Message("TopicTest", "TagA",("MQ Test"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送
SendResult result = producer.send(msg);
System.out.println(result);
}
//关闭生产者
producer.shutdown();
}
}
消费者代码
public class consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setNamesrvAddr("192.168.108.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
System.out.println(Thread.currentThread().getName() + " Receive Message: " + list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
System.out.println("consumer start...");
consumer.start();
}
}
测试结果
生产者
从控制台可以看到,在消费者端显示消息发送成功的数据,sendStatus均为ok,消费者端显示读取出来对应的消息,集群搭建成功
可视化管理平台
RocketMQ拥有一个可视化的管理平台,可以通过图形界面的方式查看集群情况、topic、生产者、消费者等具体信息 地址: RocketMQ扩展,项目拉下来后进入目录rocketmq-console 项目使用springboot搭建,在配置文件application.properties中修改配置rocketmq.config.namesrvAddr
rocketmq.config.namesrvAddr=192.168.108.128:9876;192.168.108.129:9876
启动项目,运行App.java。访问localhost:8080即可进入管理平台
猜你喜欢
- 2024-12-12 RocketMQ同一个消费者唯一Topic多个tag踩坑经历
- 2024-12-12 腾讯云微服务正式发布RocketMQ Serverless版本
- 2024-12-12 3分钟白话RocketMQ系列—— 核心概念
- 2024-12-12 RocketMQ如何避免未来再次发生积压
- 2024-12-12 rocketmq延迟消息实现原理(上)
- 2024-12-12 RocketMQ跨队列的顺序消费
- 2024-12-12 Kafka、RabbitMQ、RocketMQ、ActiveMQ 等多个分布式消息队列比较
- 2024-12-12 应如何在 Spring Boot 中使用 RocketMQ 实现批量消息消费?
- 2024-12-12 RocketMQ 5.0 多语言客户端的设计与实现
- 2024-12-12 从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘
你 发表评论:
欢迎- 最近发表
-
- 比GoPro 13更强的大疆Action 5 Pro,到底强在哪里?
- 信号和槽(信号和槽的实现原理)
- 在响应式项目中连接设计与开发(请简述实现响应式设计包括哪些技术点)
- 【C#】委托、Action、Func 和 Event 之间的关系
- 如何使用JavaScript实现Prompt弹窗?
- 谷歌Magic Actions功能曝光:AI革新安卓16通知交互
- 基于目标TPS的性能测试,如何通过手动设置场景进行测试?
- IOS基础学习之输出口和动作(io口输入输出实验总结及体会)
- 《Java语言程序设计》期末考试模拟试题——判断题和问答题
- Android学习之Touch事件的处理(android触摸事件实例)
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)