网站首页 > 博客文章 正文
本文为极客时间《消息队列高手》学习笔记,为了按照课程的安排进行学习,这里只是对RabbitMQ简单说明,后续会专门再出一下相关的文章进行详细学习。
不论是挑选消息队列,还是挑选其他的开源技术栈,我们实际上都需要考虑系统的高可靠、高可用以及高性能三个点,对应到消息队列就是:
- 高可靠:消息的传递不能丢失;
- 高可用:支持集群,不会出现单点问题;
- 高性能:能够满足大多数场景性能要求,具备一定的通用性;
1.RocketMQ
RocketMQ对在线业务的响应时延做了很多优化,大多数情况下可以做到毫秒级的响应,如果应用场景很在意响应时延可以选用RocketMQ,且每秒钟的处理请求可以在几十万量级。
1.1 发布订阅模型
发布订阅模型(Publish-Subscribe Pattern)中,消息的发送方成为发布者Publisher,消息的接收方成为订阅者Subscriber,服务端存放消息的容器称为主题Topic。发布者将消息发送到主题中,订阅者在接收消息之前需要先订阅主题。订阅既是一个动作,也可以认为是主题在消费时的一个逻辑副本,每份订阅中订阅者都可以接收到主题的所有消息。
1.2 RocketMQ的消息模型
RocketMQ使用的就是发布订阅模型,但是也由队列QUEUE这个概念。
几乎所有的消息队列产品都使用“请求-确认”机制确保消息不会在传递过程中由于网络或者服务器故障导致丢失。生产端先将消息发送到服务端Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。
如果生产者没有收到服务端返回的确认或者失败的消息,则会重新发送消息;在消费端,消费者在收到消息并且完成自己的消费业务逻辑后,也会给服务端发送消息消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则,它会给消费者重新发送消息,直到收到对应的消费成功确认。
但是为了保证消息的可靠性,就必须先保证消息的有序性,及前面的消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性原则。
也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ 在主题下面增加了队列的概念。
每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。
在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。
1.2.1 消费者组与队列的关系
消费者组和队列数并没有关系,队列数量可以根据数据量和消费速度来合理配置,RocketMQ和Kafka都可以支持水平扩容队列数量,但是都需要手动操作。
从主题中的所有队列中取出消息给所有消费者进行消费,消息只能被消费组中的一个线程进行消费,有点类似线程池的形式,工作线程消费来自不同队列的消息,感觉这也是RocketMQ低延时的原因,不同队列中的消息可以同时被消费,并且消费组的线程也可以并发的消费不同的消息。
1.2.2 producer和队列
producer会往所有队列发送消息,但不是“同一条消息每个队列都发一次”,每条消息只会往某个队列里面发送一次,而不是发往所有的队列。至于说消息最终写入哪个队列,主要是决定于当前使用的算法,一般来说默认是轮询,也可以是取模之类的算法。
对于一个消费组,每个队列上只能串行消费,因此说,主题并不能保证消息的有序,只有队列中的消息才是有序的。多个队列加一起就是并行消费了,并行度就是队列数量,队列数量越多并行度越大,所以水平扩展可以提升消费性能。
1.2.3 消费位置
每个队列都会维护一个消费位置offset,记录这个消费组在这个队列上消费消息的进度。因此,消费位置并非全局唯一。
由于每个主题中的一个队列都会被多个消费组进行消费,为此需要为每个消费组的消费的不同队列设置一个下标(每个消费组可以一直消费队列中的消息,无需等待其他消费组的确认),主题中的队列消息是有序的,为此需要等到所有消费组对此条消息进行确认,才能从队列中移除,感觉每个消费组的队列下标,可以一个队列维护一个CurrentHashMap来为此每个消费组的下标,这样的话可以防止锁的竞争。
服务器端可以在该消费组中对每个队列实时维护消费位点,并且持久化到磁盘来应对机器宕机后重启的恢复,同时针对磁盘故障也需要提供该消费位点的副本机制来保证。每个queue上的数据被每个消费组消费完应该是不会被立刻删除的,会有一个过期时间,过期后删除,没有删除的数据还可以被新接进来的消费组消费。offset相当于数组下标,broker记录每个消费组在queue上的offset,每个queue上的不同的group的消费者之间不会有影响。consumer A消费失败了offset不会递增,等待消息重发,而consumer B继续向后消费。
总的来说,一个主题Topic下面有多个queue,目的是为了负载均衡,假设我这边一个主题对应8个queue,那么消费端如果有两个实例,那就每个实例消费4个队列的数据。当生产者发送消息的时候,会落在8个queue上的某一个,消费位置由另外一个组件管理,来一个新的消费者组,就在内存和文件中记录消费的位置,如果新增,就在创建新的记录。
2.Kafka
Kafka是与周边生态系统兼容性最好的没有之一,尤其是在大数据和流计算领域,几乎所有的相关软件都会优先支持Kafka。
Kafka使用Scala和Java语言开发,设计上大量使用批量和异步思想,这种设计使得kafka能够做到超高的性能,尤其是异步收发的性能,是三者中最好的,每秒钟的处理消息也在几十万条左右。
Kafka这种异步批量的设计带来的问题是:它的同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka并不会立即发送出去,而是要一会攒一批再发送,在它的Broker中,很多地方都会使用这种“先攒一波再一起处理”的设计。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka的时延反而会比较高。所以,Kafka不太适合在线业务场景。
Kafka认为在某些情况下,瓶颈实际上不是CPU或者磁盘,而是网络宽带。而其认为的高效压缩,即把多个消息压缩在一起,而不是分别压缩每个消息,然后以这种形式发送到服务端。这批消息将以压缩形式写入,并保持压缩在日志中,并且仅由使用者解压缩。官网相关描述链接为:http://kafka.apache.org/documentation/#majorde。
Kafka的消息模型与RocketMQ是完全一样的,但是在Kafka中,不再成为队列,成为分区partition。
3 思考题:不做严格顺序要求,是否可以实现单个队列的并行消费
问题:刚刚我在介绍RocketMQ的消息模型时讲过,在消费的时候,为了保证消息的不丢失和严格顺序,每个队列只能串行消费,无法做到并发,否则会出现消费空洞的问题。那如果放宽一下限制,不要求严格顺序,能否做到单个队列的并行消费呢?如果可以,该如何实现?欢迎在留言区与我分享讨论。
感觉队列可以维护一个全局的下标,消费队列时,使用CAS进行下标的获取,由于不保证你哼消息消费的有序,这样的话可以并发的消费消息,由于有全局下标,不会出现获取队列的空洞消息。
猜你喜欢
- 2024-09-17 MQ 技术产品井喷,今天来详聊一下腾讯开源消息中间件 TubeMQ | 原力计划
- 2024-09-17 全网最通俗易懂的Kafka入门(kafka 入门)
- 2024-09-17 Kafka、RabbitMQ、RocketMQ等中间件对比—消息发送性能和区别
- 2024-09-17 17个方面对比Kafka、RabbitMQ、RocketMQActiveMQ分布式消息队列
- 2024-09-17 Kafka与RocketMq文件存储机制对比
- 2024-09-17 理解Kafka和其他MQ对比(kafka与其他mq的区别)
- 2024-09-17 RocketMQ架构最全详解(图文全面总结)
- 2024-09-17 滴滴出行基于RocketMQ构建企业级消息队列服务的实践
- 2024-09-17 7张图,带你5分钟入门RocketMQ(rocketmq简介)
- 2024-09-17 大数据相关,Kafka与MQ的区别(kafka跟rabbitmq的区别)
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)