APP下载

阿里P7架构师教你如何实现RabbitMQ 延时讯息

消息来源:baojiabao.com 作者: 发布时间:2026-05-29

报价宝综合消息阿里P7架构师教你如何实现RabbitMQ 延时讯息

RabbitMQ 延时讯息的实现(上)

我们在实际业务中有一些需要延时传送讯息的场景,例如:

1、 家里有一台智慧热水器,需要在30分钟后启动

2、 未付款的订单,15分钟后关闭

注意这里的场景是延时,不是定时。当然,解决了延时,定时就很简单了(定时=当前时刻+间隔时间)。

由于RabbitMQ本身不支援延时伫列(延时讯息),所以要通过其他方式来实现。总的来说有三种:

1、 先储存到数据库,用定时任务扫描,登记时刻+延时时间,就是需要投递的时刻

2、 利用RabbitMQ的死信伫列(Dead Letter Queue)实现

3、 利用rabbitmq-delayed-message-exchange外挂

定时任务实现比较简单,此处略过。我们来看一下后两种方案分别怎么实现。

前提知识:我们可以在传送讯息时指定单条讯息的存活时间(Time To Live,TTL)。也可以设定一个伫列的讯息过期时间。

这两种方式,当伫列中的讯息到达过期时间(比如30分钟)仍未被消费,就会被发送到伫列的死信交换机(Dead Letter Exchange,DLX),被再次路由,此时再次路由到的伫列就被称为死信伫列(Dead Letter Queue)。需要注意,死信交换机和死信交换机都是基于其用途来描述的,它们实际上也是普通的交换机和普通的伫列。如果伫列没有指定DLX或者无法被路由到一个DLQ,则伫列中过期的讯息会被直接丢弃。

因此,我们可以利用讯息TTL的特性,实现讯息的延时投递。

1、设定单条讯息的过期时间的方法:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()

.deliveryMode(2) // 持久化讯息

.contentEncoding("UTF-8")

.expiration("10000") // TTL,10秒后没有被消费则被发送到DLX

.build();

channel.basicPublish("", "TEST_TTL_QUEUE", properties, msg.getBytes()); //此处传送到 AMQP Default 这个预设的Direct型别的交换机,并路由到TEST_TTL_QUEUE伫列

2、设定伫列的讯息过期时间的方法:

Map argss = new HashMap();

argss.put("x-message-ttl",6000); // TTL,6秒后没有被消费则被发送到DLX

channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);

注意:如果同时设定了讯息的过期时间和伫列的讯息过期时间,则会取其中一个较小的值。比如讯息设定5秒过期,伫列设定讯息10秒过期,则实际过期时间是5秒。

基于讯息TTL,我们来看一下如何利用死信伫列(DLQ)实现延时伫列:

总体步骤:

1)建立一个交换机

2)建立一个伫列,与上述交换机系结,并且通过属性指定伫列的死信交换机。

3)建立一个死信交换机

4)建立一个死信伫列

4)将死信交换机系结到死信伫列

5)消费者监听死信伫列

程式码如下:

消费者:

因为此处使用预设的AMQP Default的Exchange,所以省略了第1)步,没有建立交换机。

这里用指定讯息的TTL实现,所以设定伫列TTL属性的程式码注释了。

// 指定伫列的死信交换机

Map arguments = new HashMap();

arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");

// arguments.put("x-expires","9000"); // 设定伫列的TTL

// 宣告伫列(预设交换机AMQP default,Direct)

channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments);

// 宣告死信交换机

channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null);

// 宣告死信伫列

channel.queueDeclare("DLX_QUEUE", false, false, false, null);

// 系结,此处 Dead letter routing key 设定为 #,代表路由所有讯息

channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");

生产者:

String msg = "Hello world, Rabbit MQ, DLX MSG";

// 设定属性,讯息10秒钟过期

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()

.deliveryMode(2) // 持久化讯息

.contentEncoding("UTF-8")

.expiration("10000") // TTL

.build();

// 传送讯息

channel.basicPublish("", "TEST_DLX_QUEUE", properties, msg.getBytes());

讯息的流转流程

生产者——原交换机——原伫列——(超过TTL之后)——死信交换机——死信伫列——最终消费者

如图:

使用死信伫列实现延时讯息的缺点:

1) 如果统一用伫列来设定讯息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要建立很多交换机和伫列来路由讯息。

2) 如果单独设定讯息的TTL,则可能会造成伫列中的讯息阻塞——前一条讯息没有出队(没有被消费),后面的讯息无法投递。

3) 可能存在一定的时间误差。

下一篇文章,我们来讲一下如何使用rabbitmq的延时讯息外挂来实现延时讯息。

RabbitMQ 延时讯息的实现(下)

在RabbitMQ 3.5.7及以后的版本提供了一个外挂(rabbitmq-delayed-message-exchange)来实现延时伫列功能。同时外挂依赖Erlang/OPT 18.0及以上。

外挂源代码地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

外挂下载地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

1、进入外挂目录

whereis rabbitmq

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

2、下载外挂

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

如果下载的档名带问号则需要改名,例如:

mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez

3、启用外挂

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、停用外挂

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

5、外挂使用

通过宣告一个x-delayed-message型别的exchange来使用delayed-messaging特性。x-delayed-message是外挂提供的型别,并不是rabbitmq本身的(区别于direct、topic、fanout、headers)。

程式码:

消费者(先启动):

// 宣告x-delayed-message型别的exchange

Map argss = new HashMap();

argss.put("x-delayed-type", "direct");

channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false,

false, argss);

// 宣告伫列

channel.queueDeclare("DELAY_QUEUE", false,false,false,null);

// 系结交换机与伫列

channel.queueBind("DELAY_QUEUE", "DELAY_EXCHANGE", "DELAY_KEY");

// 建立消费者

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

byte[] body) throws IOException {

String msg = new String(body, "UTF-8");

SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

System.out.println("收到讯息:[" + msg + "] 接收时间:" +sf.format(new Date()));

}

};

// 开始获取讯息

channel.basicConsume("DELAY_QUEUE", true, consumer);

生产者(后启动):

// 延时投递,比如延时1分钟

Date now = new Date();

Calendar calendar = Calendar.getInstance();

calendar.add(Calendar.MINUTE, +1);// 1分钟后投递

Date delayTime = calendar.getTime();

SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

String msg = "传送时间:" + sf.format(now) + ",投递时间:" + sf.format(delayTime);

// 延迟的间隔时间,目标时刻减去当前时刻

Map headers = new HashMap();

headers.put("x-delay", delayTime.getTime() - now.getTime());

AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()

.headers(headers);

channel.basicPublish("DELAY_EXCHANGE", "DELAY_KEY", props.build(),

msg.getBytes());

channel.close();

conn.close();

控制台输出:

收到讯息:[传送时间:2019-01-15 20:44:41.000,投递时间:2019-01-15 20:45:41.003]

接收时间:2019-01-15 20:45:41.064

2018已经过去过去,2019还想一成不变吗?拥抱变化,突破瓶颈,想要学习Java架构技术的朋友可以加我的群:725219329,群内每晚都会有阿里技术大牛讲解的最新Java架构技术。并会录制录播视讯分享在群公告中,作为给广大朋友的加群的福利——分散式(Dubbo、Redis、RabbitMQ、Netty、RPC、Zookeeper、高并发、高可用架构)/微服务(Spring Boot、Spring Cloud)/源代码(Spring、Mybatis)/效能优化(JVM、TomCat、MySQL)

2019-01-23 23:38:00

相关文章