RocketMq试用+简单的Spring集成
经过2天的试用初步了解了一下RocketMq的基本用法,搜索了一下度娘,没有找到Spring的例子,所以简单搞了一点代码感受一下。
1.RocketMq
RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ,有以下特点: 1) 能够保证严格的消息顺序 2) 提供丰富的消息拉取模式 3) 高效的订阅者水平扩展能力 4)实时的消息订阅机制 5)亿级消息堆积能力
2.核心原理
2.1. 数据结构
(1)所有数据单独储存到commit Log ,完全顺序写,随机读 (2)对最终用户展现的队列实际只储存消息在Commit Log 的位置信息,并且串行方式刷盘 (3)按照MessageId查询消息
(4)根据查询的key的hashcode%slotNum得到具体的槽位置
(5)根据slotValue(slot对应位置的值)查找到索引项列表的最后一项 (6)遍历索引项列表返回查询时间范围内的结果集
2.2. 刷盘策略
rocketmq中的所有消息都是持久化的,先写入系统pagecache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,可以直接从内存读取
使用简单的符号标识不同的标题,将某些文字标记为粗体或者
斜体
,创建一个等,详细语法参考帮助?。
本编辑器支持 Markdown Extra , 扩展了很多好用的功能。具体请参考[Github][2].
2.3. 内存机制
2.4. 工作模式
3. 环境安装
3.1. JAVA环境安装
安装
rpm -ivh jdk-7u80-linux-x64.rpm
环境变量
JAVA_HOME=/usr/java/jdk1.7.0_80 CLASSPATH=.:$JAVA_HOME/lib.tools.jar PATH=$JAVA_HOME/bin:$PATH export JAVA_HOME CLASSPATH PATH export ROCKETMQ_HOME=/usr/local/service/alibaba-rocketmq
3.2. RocketMq安装
https://github.com/alibaba/RocketMQ/releases下载3.2.6,解压
4. 测试网络拓扑
因为手里没有其他服务器,105那台缺少一个slave,在同步双写模式下,发送消息会返回 SLAVE_NOT_AVAILABLE
,不过消息已经发送成功,只是slave没有写成功。
5. 启停操作
这里只给出一个基本的示例,各个模式的启停在本文最后的参考文献中会有详细的说明。这里不再赘述。
- 启动nameserver
nohup ./mqnamesrv &复制代码
- 停止nameServer
./mqshutdown namesrv复制代码
- 启动broker(单master)(多master,多master+slave)对应的(异步复制,同步双写)
nohup sh mqbroker -n 192.168.146.109:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &复制代码
- 停止broker
./mqshutdown broker复制代码
6. 运维指令
- 查看集群情况
./mqadmin clusterList -n 127.0.0.1:9876复制代码
- 查看broker状态
./mqadmin brokerStatus -n 127.0.0.1:9876 -b 192.168.146.105:109111复制代码
- 查看topic列表
./mqadmin topicList -n 127.0.0.1:98761复制代码
- 查看topic状态
./mqadmin topicStatus -n 127.0.0.1:9876 -t PushTopic复制代码
- 查看topic路由
./mqadmin topicRoute -n 127.0.0.1:9876 -t PushTopic复制代码
7. 基本测试
基本测试采用java直接编码的方式生产和消费消息,例子来源于参考文献的《RocketMQ开发教程》。本文最后的代码示例,采用了spring的形式。
- Producer
package com.jd.wxz;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class Producer { public static void main(String[] args){ DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("192.168.146.109:9876"); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PullTopic", "pull", "1", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } }}复制代码
- Consumer
package com.sean;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer { public static void main(String[] args){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("192.168.146.109:9876"); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); //程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( Listlist, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } }}复制代码
运行结果:
服务端监控:
8.宕机实验
9.遗留问题
1) 关闭master 自动切换到slave无法实现,官方资料上没有明确指明,第三方文档里有(见文献3)。2) 在开发机服务器上运行os.sh进行优化,导致网络无法连接,运维帮忙重启才恢复。复制代码
技术的提升是需要下苦工,需要坚持不懈的努力。就比如下面的分享的这些技术点,是否都学会并掌握了呢?如需要以下图谱以及跟多提升架构技术的资源可加入我的粉丝Qqun:855801563。我花了将近一个月时间搜集整理了一套架构技术提升知识点讲解以及一些面试题解析和答案免费分享给大家。助力各位程序员朋友突破自我提升技能,实现自己的目标。