APP下载

从源代码告诉你 RocketMQ的tag有什么坑

消息来源:baojiabao.com 作者: 发布时间:2024-05-18

报价宝综合消息从源代码告诉你 RocketMQ的tag有什么坑

作者:kinnylee

来源:https://0x9.me/wPORU

背景介绍

专案组使用阿里RocketMQ,对同一个消费组设定不同的tag订阅关系,出现讯息丢失的问题,本文从rocketmq源代码研究讯息释出与订阅原理,并分析导致该问题的原因。

官方说明

告诉使用者:同一个消费组,必须保持订阅关系一致为什么?它没有说!只能从源代码找答案

问题复现

1.启动消费者1,消费组为group1,订阅topicA的讯息,tag设定为tag1 || tag2

2.启动消费者2,消费组也为group1,也订阅topicA的讯息,但是tag设定为tag3

3.启动生产者,生产者传送含有tag1,tag2,tag3的讯息各10条

4.消费者1没有收到任何讯息,消费者2收到部分讯息

先上结论

同一个消费组中,设定不同tag时,后启动的消费者会覆盖先启动的消费者设定的tagtag决定了讯息过滤的条件,经过服务端和客户端两层过滤,最后只有后启动的消费者才能收到部分讯息原理说明

讯息如何储存

CommitLog

储存所有topic的原始讯息CommitLog分为多个档案,每个档案预设最大为1G每条记录包括:讯息长度和讯息文字(讯息体,属性,uid等等)因每条讯息长度不一致,每个commitLog的记录长度也不一致

ConsumerQueue

储存某个Topic下某个Queue的索引资讯每条记录包括:讯息在commitLog中的offset,讯息大小,讯息tag的杂凑值每条记录长度固定为20byteproducer传送讯息后,先储存到commitLog,再异步建立该条讯息对应的topic + queue对应的ConsumerQueue索引第三部分的Hash(tag)是服务端过滤讯息的重要依据

consumer如何订阅讯息

注册订阅资讯

consumer订阅时,会将订阅资讯注册到到服务端储存订阅资讯的是Map类,key为topic,value主要是tagsubVersion取当前时间。这里的key是topic,subVersion版本号,这两点很关键!后面有用到!

拉取讯息并过滤

拉取讯息时,首先从服务端获取订阅关系,得到tag的hash集合codeSet然后从ConsumerQueue获取一条记录,判断记录的hashCode是否在codeSet中,以达到讯息过滤的目的,决定是否将该讯息传送给consumer总之一句话:tag决定了讯息是否发到客户端讯息过滤

服务端过滤

过滤:tag的hash值过滤优点:减少不必要讯息占用流量缺点:Hash存在冲突,过滤不完全准确

客户端过滤

服务端过滤存在不准确性,客户端再次精确过滤客户度过滤:tag的字串值做对比。不相等的不返回给消费者原因总结

同一个consumer group的订阅关系,储存在RebalanceImpl类的Map中。key为topic不同的消费者启动后,依次注册订阅关系,因为tag不一样,导致Map中同一topic的tag被覆盖。比如:消费者1订阅tag1,消费者2订阅tag2。最后map中只储存tag2.过滤的核心是是tag,tag被更新,过滤条件被改变。服务端过滤后只返回tag2的讯息客户端接收讯息后,再次过滤。先启动的消费者1订阅tagA,但是服务端返回tag2,所以消费者1收不到任何讯息。消费者2能收到一半的讯息(丛集模式,假设讯息平均分配,另外一半分给tag2)源代码分析

订阅关系资料结构

消费者1启动时注册的订阅关系

消费者2后启动覆盖订阅关系

服务端过滤时取出ConsumerQueue的Hash(tag)

对比讯息的Hash(tag)和之前储存的订阅关系

客户端过滤

2019-06-30 04:45:00

相关文章