前言
现在越来越多的产品采用的是分散式架构,部署的时候也同样是分散式部署,那么各个应用间的异步通讯大多选择讯息中介软件MQ来处理,那么就回避不了两个问题:1. 传送讯息的顺序性
2. 讯息被重复消费
目前在生产环境,使用较多的讯息伫列中介软件有ActiveMQ,RabbitMQ,Kafka,RocketMQ等,本文的设计是以RocketMQ为例来解决这两个问题。
一、传送讯息的顺序性
1、 什么是顺序讯息?顺序讯息即有序讯息,传送者(Producer)按照顺序传送讯息,消费者(Consumer)按照讯息的传送顺序进行消费。例如:我们在某宝购买一款膝上型电脑,需要下单、支付和订单完成这3个流程,相对应的产生3条讯息,分别是建立订单——订单支付——订单完成,为了保证业务的完整性肯定需要按照这个顺序依次消费才能达到预期目的。
2、 第一种模型
但是在生产环境MQ肯定是丛集部署,例如多Master模式、多Master多Slave模式(异步复制)、多Master多Slave模式(同步双写)等模式。为了保证讯息的顺序模型可能是这样的:

M1:建立订单、M2:订单支付、M3:订单完成
由于MQ Service是丛集部署,假设M1传送到MQ Service1,M2传送到MQ Service2,依次类推。如果要保证M1最先被消费,那么需要M1到达消费端被消费后,通知MQ Service2,然后MQ Service2再将M2传送到消费端,M2被消费后,再通知MQ Service3,将M3传送到消费端。
问题:三条讯息分别传送到三台或者其中两台Service上,就很难保证M1第一个到达MQ丛集,也不能保证最先被消费。加入M2、M3其中任意一个优先于M1到达MQ丛集,并且优先于M1被消费,那么就没有顺序可言了。综合分析这个架构模式并不能保证讯息被MQ顺序消费。
3、 第二种模型
基于第一种模型分析来看要想保证M1、M2、M3能够顺序消费,首先要保证能够顺序传送到同一个MQ Service中,改进后模型如下:

如上图所示,将三条讯息按照顺序传送到同一个MQ Service中,基于先到先被消费原则,依次消费的顺序为M1 > M2 > M3,这样就保证了讯息的顺序性。
如果使用这种设计在正常情况下是没问题的,但是在实际场景中很可能会遇到下面的问题:

生产者、MQ丛集和消费者不可能释出在同一台服务器中,那么讯息在传输过程中就会遇到网络延迟问题。如上图所示,M1和M2在传送给“消费者1”的过程中遇到了延迟问题,M3先于M1和M2被消费,那问题又回到了原点,这种方案依然不能解决讯息被顺序消费的问题。
4、 第三种模型
第二种模型宣告失败,接着分析,我们保证了生产者将3个讯息按照顺序传送给同一个 MQ Service这个逻辑是没问题的,那么为了解决上方网络延迟问题,那我们就把3个讯息传送给同一个“消费者”被消费呢?即使遇到网络问题或者消费者响应问题,M1被消费失败,为了保证讯息一定会被消费,肯定会选择重发讯息到另一个“消费者”端,如下图所示:

如上图所示,将3条讯息传送给“消费者1”,M1被消费的时候遇到问题,没有被消费成功,那么会将讯息传送给“消费者2”进行重试,这样就保证了讯息的顺序性。
但是可能会遇到另一个问题,“消费者1”没有响应有两种情况,一种是M1在网络传输过程中丢失,另一种是“消费者1”已经消费成功了但是返回的响应资讯没有被MQ Service收到。如果是第二种情况重发M1给“消费者2”就会造成M1被重复消费,也就引发了文章开头的第二个讯息重复消费问题。
我们总结一下要保证讯息严格的按照顺序消费,最可行的办法就是:
保证生产者 —— MQService —— 消费者 是一对一对一的关系
5、 MessageQueueSelector实现顺序传送和消费
上述办法虽然可行性最高,但是也存在更加严重的问题,例如:
1. 并行度就会成为讯息系统的瓶颈(吞吐量不够)
2. 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。
看到这里挠挠头又掉了一大把头发,过度设计将会造成效率低下,甚至浪费更多的资源。换种思路,从业务角度来看,保证讯息的顺序性不仅仅是依靠讯息系统,那就寻找更加合理的方式来解决。
RocketMQ本身具有传送顺序讯息功能,那么通过源代码角度来分析一下:
// RocketMQ通过MessageQueueSelector中实现的算法来确定讯息传送到哪一个伫列上
// RocketMQ预设提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定讯息按照何种策略传送到讯息伫列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
备注:send方法带有引数MessageQueueSelector,MessageQueueSelector是让使用者自己决定讯息传送到哪一个伫列,如果是区域性讯息的话,用来决定讯息与伫列的对应关系。
6、 源代码示例
接下来我们就用程式码模拟一下MessageQueueSelector如何使用。
1、 建立一个生产者(Producer)
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for(int i = 0;i int orderId=(int)((Math.random()*9+1)*10000000);
for(int j = 0;j Message msg = new Message("AAA","TagA",("推送的订单ID为="+orderId).getBytes());
try {
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
},orderId);
System.out.println(sendResult);
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
上述生产者程式码中我们看到在传送讯息的时候使用了两个for循环来模拟场景,第一个for循环是产生6个订单,按照MQ的负载策略6个订单将分别传送到不同的消费者端。第二个for循环是每个订单里面产生3条有序的订单讯息(M1、M2、M3),订单id是随机生成不重复的9位数字(生产场景使用不同的规则)。
2、 建立两个消费者(Consumer)
消费者1:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer1 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("AAA","TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list){
try {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
System.out.println("接收到讯息1:topic:"+topic+",tags:"+tags+",msg:"+msgBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}
消费者2:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer1 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("AAA","TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list){
try {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
System.out.println("接收到讯息2:topic:"+topic+",tags:"+tags+",msg:"+msgBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}
3、 执行测试
先启动两个消费者,启动成功后再启动生产者,执行结果如下:
生产者执行资讯
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0061910000, offsetMsgId=3452373400002A9F000000000006325B, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=397]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00621C0001, offsetMsgId=3452373400002A9F0000000000063315, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=398]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062610002, offsetMsgId=3452373400002A9F00000000000633CF, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=399]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062A00003, offsetMsgId=3452373400002A9F0000000000063489, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=919]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062DF0004, offsetMsgId=3452373400002A9F0000000000063543, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=920]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00631F0005, offsetMsgId=3452373400002A9F00000000000635FD, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=921]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00635F0006, offsetMsgId=3452373400002A9F00000000000636B7, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=348]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00639E0007, offsetMsgId=3452373400002A9F0000000000063771, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=349]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0063DE0008, offsetMsgId=3452373400002A9F000000000006382B, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=350]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064210009, offsetMsgId=3452373400002A9F00000000000638E5, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=327]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006462000A, offsetMsgId=3452373400002A9F000000000006399F, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=328]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064A2000B, offsetMsgId=3452373400002A9F0000000000063A59, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=329]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064E5000C, offsetMsgId=3452373400002A9F0000000000063B13, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=330]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006525000D, offsetMsgId=3452373400002A9F0000000000063BCD, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=331]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006565000E, offsetMsgId=3452373400002A9F0000000000063C87, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=332]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0065A7000F, offsetMsgId=3452373400002A9F0000000000063D41, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=922]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0065E60010, offsetMsgId=3452373400002A9F0000000000063DFB, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=923]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0066250011, offsetMsgId=3452373400002A9F0000000000063EB5, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=924]
一共产生了6组资料,每组3条讯息,共传送了18条讯息。
消费者1执行资讯:
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420
接收到讯息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420
消费者1接收到了3组讯息,共9条资讯。
消费者2执行资讯:
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694
接收到讯息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694
消费者1接收到了3组讯息,共9条资讯。
备注:通过执行结果可以看出生产者传送的讯息会根据MessageQueueSelector实现的算法来选择一个伫列,那么相同的策orderId讯息自然是传送到了同一个伫列,那么在消费的时候也会被一起消费。
二、讯息被重复消费
在上面设计订单顺序消费的时候有丢掷一个新的问题,就是讯息重复。但是熟读RocketMQ API的朋友应该都知道,RocketMQ是不提供重复消费问题的解决方案。那么先来了解一下什么是重复性消费:
1、 什么是讯息重复消费?
在网络不可达的情况下,只要通过网络今夕资料交换,就不可避免的产生同一条讯息被不同两个或两个以上的消费者消费。
2、 解决方案
如果消费端收到两条一样的资讯,怎么处理呢?
消费端处理讯息的业务逻辑保持幂等性保证每条讯息都有唯一编号且保证讯息处理成功与去重表的日志同时出现第1条的原理是,只要保持幂等性,不管来多少条重复讯息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的msgId,如果新到的msgId已经在日志表中,那么就不再处理这条讯息。
第1条解决方案,很明显应该在消费端实现,不属于讯息系统要实现的功能。第2条可以讯息系统实现,也可以业务端实现。正常情况下出现重复讯息的概率其实很小,如果由讯息系统来实现的话,肯定会对讯息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理讯息重复的问题,这也是RocketMQ不解决讯息重复的问题的原因。
三、总结
要实现顺序消费根据文中的方案就可以解决掉这个问题,但是在实际应用场景中肯定不会这么简单。跟其他朋友在聊这个话题的时候,他们还采用了其他的解决方案,比如有先后业务逻辑耦合的讯息不通过MQ实现,通过业务程式码实现,吞吐瓶颈可以通过多执行绪解决,当然体量过大还是需要MQ的。
由于RocketMQ并不保证讯息不重复,如果你的业务需要保证严格的不重复讯息,需要你自己在业务端去重,这样问题始终在可控范围内。






























