APP下载

高阶开发必须掌握RabbitMQ之初识RabbitMQ

消息来源:baojiabao.com 作者: 发布时间:2024-05-19

报价宝综合消息高阶开发必须掌握RabbitMQ之初识RabbitMQ

RabbitMQ

我们知道AMQP(高阶讯息伫列协议) 是一个用于在分散式系统中储存转发讯息进行通讯网络协议。而RabbitMQ是实现AMQP协议的讯息中介软件的一种(主要用于应用程序的异步通讯和解偶,讯息的传送者无需关心讯息使用者,反之亦然。),RabbitMQ的服务器端用Erlang语言编写,支援多种客户端,如:Python、.NET、Java、JMS、C、PHP等,支援AJAX。

既然RabbitMQ是对AMQP的具体实现。要想彻底的了解RabbitMQ的设计,我们必须要先来了解AMQP 模型。

这里大家如果有兴趣的可以去阅读官方文件AMQP_0-9-1_Model,这里我简单归纳总结一下 AMQP模型。

从上一篇章的应用解耦需求出发,我们来看看讯息中介软件(messaging middleware broker)及其上下游系统所扮演的角色。

很显然,在上面的使用场景分析中我们可以看出来,讯息中介软件(brokers) 主要承担一个讯息(message)容器的角色,它接收从释出者(publishers) 亦称生产者(producers)那儿来的讯息。并根据既定的路由规则把接收到的讯息传送给处理讯息的消费者(consumers) 处理(实际上rabbitmq是生产者(producers) 投递到交换机(exchange) ,然后exchange按照路由规则分发到特定的伫列(queue) ,再推送给消费者(consumers),或者消费者(consumers)主动拉取。)。

OK,到了这一步,我们对用讯息中介软件的模型设计有了一定的了解,接下来,我们先看看官方的AMQP模型设计:

AMQP模型

AMQP协议模型设计的工作过程如下:讯息(message) 被发布者(publisher) (国内更多称为生产者,后面我们统称生产者(producers))传送给交换机(exchange) (国内一般比喻成邮局或者邮箱)。然后交换机将收到的讯息根据路由规则分发给系结的伫列(queue) 。最后AMQP的实现者讯息中介软件 (例如rabbitmq)会将讯息投递给订阅了此伫列的消费者(consumers) ,或者消费者按照需求来主动拉取。

图示如下:

从模型设计,我们可以清楚的了解AMQP的所有元件及其功能,接下来,我们介绍我们今天的主角,实现AMQP协议的讯息中介软件rabbitmq。

AMQP协议层角色相关的概念

生产者(producer):产生讯息的应用,能够传递讯息到讯息中介软件的应用。

讯息中介软件(brokers):讯息传递的中间载体,即我们今天的主角rabbitmq。

消费者(consumers):接收并处理讯息的应用。从讯息中介软件获取讯息并处理。

连线(Connection):生产者 或 消费者 和 讯息中介软件之间需要建立起连线。AMQP应用层协议使用的是能够提供可靠投递的TCP连线,AMQP的连线通常是长连线,AMQP使用认证机制并且提供TLS(SSL)保护。当我们的生产者 或 消费者 不再需要连线到讯息中介软件的的时候,需要优雅的释放掉它们与讯息中介软件TCP连线,而不是直接将TCP连线关闭。

通道(channel):通常情况下生产者 或 消费者 需要与 讯息中介软件之间建立多个连线。无论怎样,同时开启多个TCP连线都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP协议提供了通道(channel)这个概念来处理多连线,可以把通道理解成共享一个TCP连线的多个轻量化连线。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。

讯息中介软件相关的概念

虚拟主机(vHosts):虚拟主机概念,一个Virtual Host里面可以有若干个Exchange和Queue,我们可以控制使用者在Virtual Host的许可权。后面使用篇章再详细说明。

使用者(User):最直接了当的认证方式,谁可以使用当前的讯息中介软件。

交换机(Exchange):交换机接收生产者发出的讯息并且路由到由交换机型别和被称作系结(bindings)的规则所决定的到伫列中,交换机不储存讯息。

在宣告交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

● Name(名称)

● Durability (持久化):讯息代理重启后,交换机是否还存在。交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在讯息中介软件(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在讯息中介软件再次上线后重新被宣告)。然而并不是所有的应用场景都需要持久化的交换机。

● Auto-delete (自动删除):当所有与之系结的讯息伫列都完成了对此交换机的使用后,是否自动删掉它。

● Arguments(引数):alternate-exchange等,指定无法路由时的辅助路由。

讯息伫列(Queue):储存还未被消费者消费的讯息的容器,伫列具有以下属性:

● Name(名称)

● Durable(持久化):讯息中介软件重启后,伫列是否依旧存在。持久化伫列(Durable queues)会被储存在磁盘上,当讯息中介软件(broker)重启之后,它依旧存在。没有被持久化的伫列称作暂存伫列(Transient queues)。这里需要注意伫列的持久化和它储存的未被消费讯息的持久化是2个概念,伫列的持久化并不会使储存的讯息持久化。假如讯息中介软件(broker)重启之后,持久化伫列会被重新宣告,但它里面储存的讯息只有设定过持久化的讯息才能被重新恢复。

● Exclusive(专用伫列):可以这样理解当建立这个伫列的Connection关闭后伫列即被删除,不存在其它的使用可能性。

● Auto-delete(自动删除):当没有消费者订阅这个伫列的时候就会被删除。

● Arguments(引数):讯息中介软件用来完成类似设定最大长度死信伫列等的引数。

讯息伫列在宣告(declare)后才能被使用。如果一个伫列尚不存在,宣告一个伫列会建立它。如果宣告的伫列已经存在,并且属性完全相同,那么此次宣告不会对原有伫列产生任何影响。 如果宣告中的属性(名称除外)与已存在伫列的属性有差异,那么则会申明失败。

讯息(message): 生产者产生的和消费者处理的讯息。RabbitMQ的讯息是有属性概念的。有些属性是被讯息中介软件所使用的,但是大多数是开放给接收它们的应用直译器用的。我们也可以为讯息定义讯息头(headers)。讯息属性需要在讯息被发布的时候定义。例如:

● Content type(内容型别)

● Content encoding(内容编码)

● Routing key(路由键)

● Delivery mode (persistent or not) 投递模式(持久化 或 非持久化)这里需要注意,讯息的持久化依赖与伫列的持久化,我们需要同步设定。

● Message priority(讯息优先权)

● Message publishing timestamp(讯息释出的时间戳)

● Expiration period(讯息有效期)

● Publisher application id(释出应用的ID)

路由键(routing key):路由关键字,交换机exchange的路由规则利用这个关键字进行讯息投递到讯息伫列。( 路由键长度不能超过255个字节)

系结(Binding):Binding可以理解为交换机Exchange路由讯息到讯息伫列的路由规则关系(即讯息伫列和交换机的系结)。当交换机Exchange收到生产者传递的讯息Message时会解析其Routing Key,Exchange根据Routing Key与交换机型别Exchange Type将Message路由到讯息伫列中去。

概念固然很多,但是这个时候一定要保持耐心,知道概念是第一步,理解它是第二步,运用它是第三步,只有到了第三步,我们才算是真正的能解决问题。

我这里给出 rabbitmq JAVAClient 的API,做技术的,看API更加有助于理解。这里如果不能全部理解也不要着急,一步步消化。

//第一步配置连线工厂

ConnectionFactory factory = new ConnectionFactory();

//设定使用者名称

factory.setUsername("test");

//设定使用者密码

factory.setPassword("test321");

//设定虚拟主机(虚拟主机和使用者的许可权管理 我们后面抽一小篇描述)

factory.setVirtualHost("test");

//设定主机地址

factory.setHost("192.168.199.188");

//设定埠

factory.setPort(5672);

Connection conn = null;

//或者用下面配置连线工厂(分引数配置 和 URI 二选一)

try {

factory.setUri("amqp://test:[email protected]:5672/test");

} catch (Exception e) {

e.printStackTrace();

}

//建立连线

try {

conn = factory.newConnection();

} catch (Exception e) {

e.printStackTrace();

}

//建立channel

Channel channel = null;

if(conn != null){

try {

channel = conn.createChannel();

} catch (IOException e) {

e.printStackTrace();

}

}

//exchanges和queues是Client端应用所必须的。在使用之前必须先“declared”(宣告),确保在使用之前已经存在,如果不存在则建立它,如果存在,申明不回有任何影响的,这些操作都包含在declare里。

if(channel !=null){

/**

* 申明exchange的API.

* @param exchange 名称

* @param type 型别

* @param durable 持久化 defaul true

* @param autoDelete 自动删除 defacult true

* @param internal 内部使用 true if the exchange is internal, i.e. can't be directly published to by a client.

* @param arguments 额外引数

* @return a declaration-confirm method to indicate the exchange was successfully declared

* @throws java.io.IOException if an error is encountered

*/

/*Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException;*/

try {

//交换机的引数配置(这里API给全,大家按需配置,重点是了解)

Map arguments =new HashMap();

/*

当一个讯息不能被route的时候,如果exchange设定了AE,则讯息会被投递到AE。如果存在AE链,则会按此继续投递,直到讯息被route或AE链接束或遇到已经尝试route过讯息的AE。

*/

arguments.put("alternate-exchange", "amq.direct");

//申明我们的exchange(具体的API呼叫)

AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare("test", BuiltinExchangeType.DIRECT, true, false,false,arguments);

//申明死信伫列用的(重点关注上面的API,这里大家也能看出来,预留后面用的上)

channel.exchangeDeclare("dead", BuiltinExchangeType.DIRECT, true, false,false,null);

} catch (IOException e) {

e.printStackTrace();

}

/**

* 伫列申明API

* @param queue 名称

* @param durable 持久化 defaul true

* @param exclusive 专用性 defaul true

* @param autoDelete 自动删除 defacult true

* @param arguments 额外引数

* @return a declaration-confirm method to indicate the queue was successfully declared

* @throws java.io.IOException if an error is encountered

*/

/* Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException;*/

//伫列的引数配置(这里API给全,大家按需配置)

Map map = new HashMap();

//标志伫列中的讯息存活时间,也就是说伫列中的讯息超过了指定时间会被删除(数字型别,标志时间,以豪秒为单位)

map.put("x-message-ttl", 1 * 24 * 60 * 60 * 10000);

//伫列自身的空闲存活时间,当前的queue在指定的时间内,没有consumer、basic.get也就是未被访问,就会被删除。(数字型别,标志时间,以豪秒为单位)

map.put("x-expires",60 * 60 * 10000);

//最大长度和最大占用空间,设定了最大长度的伫列,在超过了最大长度后进行插入会删除之前插入的讯息为本次的留出空间(预设操作是如此,我们可以设定overflow来改变,例如用在并发缓冲时),相应的最大占用大小也是这个道理,当超过了这个大小的时候,会删除之前插入的讯息为本次的留出空间。

map.put("x-max-length",10000);

map.put("x-max-length-bytes",100 * 1024 * 1024);

//伫列超出最大长度的处理方案 ,伫列溢位的预设处理方案:drop-head (default) 或者拒绝讯息 reject-publish 我们做并发限流的时候需要设定为超出伫列拒绝

map.put("x-overflow","reject-publish");

//惰性伫列会尽可能的将讯息存入磁盘中,而在消费者消费到相应的讯息时才会被载入到内存中,它的一个重要的设计目标是能够支援更长的伫列,即支援更多的讯息储存。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费讯息造成堆积时,惰性伫列就很有必要了。

map.put("x-queue-mode","lazy");

//丛集属性,我们这里暂不讨论

map.put("x-queue-master-locator","");

/*

讯息因为超时或超过限制在伫列里消失,这样我们就丢失了一些讯息,也许里面就有一些是我们做需要获知的。而rabbitmq的死信功能则为我们带来了解决方案。设定了dead letter exchange与dead letter routingkey(要么都设定,要么都不设定)那些因为超时或超出限制而被删除的讯息会被推动到我们设定的exchange中,再根据routingkey推到queue中.

*/

map.put("x-dead-letter-exchange","dead");

map.put("x-dead-letter-routing-key","key");

//伫列所支援的优先级别,列如设定为5,表示伫列支援0到5六个优先级别,5最高,0最低,当然这需要生产者在传送讯息时指定讯息的优先级别,讯息按照优先级别从高到低的顺序分发给消费者

map.put("x-max-priority",5);

try {

//申明一个test伫列 API

channel.queueDeclare("test", true, false, false, map);

} catch (IOException e) {

e.printStackTrace();

}

/**

* 系结伫列和交换机的API

* @param queue 伫列名称

* @param exchange 交换机名称

* @param routingKey 系结的 routingKey

* @param arguments 额外的引数

* @return a binding-confirm method if the binding was successfully created

* @throws java.io.IOException if an error is encountered

*/

/*Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map arguments) throws IOException;*/

try {

//系结交换机 和 伫列

AMQP.Queue.BindOk bindOK = channel.queueBind("test", "test", "test", null);

} catch (IOException e) {

e.printStackTrace();

}

//传递讯息

byte[] messageBodyBytes = "Hello, world!".getBytes();

//我们可以为讯息设定Header

Map headers = new HashMap();

headers.put("latitude", 51.5252949);

headers.put("longitude", -0.0905493);

/**

* 释出讯息API

* @param exchange 交换机名称

* @param routingKey 路由建

* @param mandatory true mandatory标志告诉服务器至少将该讯息route到一个伫列中,否则将讯息返还给生产者

* @param props other properties for the message - 其他属性

* @param body the message body 讯息体

* @throws java.io.IOException if an error is encountered

*/

/*void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)

throws IOException;*/

try {

//释出一条讯息

channel.basicPublish("test", "test", true,

(new AMQP.BasicProperties.Builder()

.headers(headers)

.contentType("text/plain")

.deliveryMode(2)

.priority(1)

.userId("test")

.appId("test")

.expiration("60000")

.build()),

messageBodyBytes);

} catch (IOException e) {

e.printStackTrace();

}

//讯息是否进伫列监听处理

channel.addReturnListener(new ReturnListener() {

@Override

public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {

}

});

/**

* Request specific "quality of service" settings.

* @param prefetchSize abbitmq会传递的最大讯息总量大小, 0 代表不限制

* @param prefetchCount rabbitmq会传递的最大讯息数, 0 代表不限制

* @param global true 代表装置本channel而不只是customer

* @throws java.io.IOException if an error is encountered

*/

/*void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;*/

try {

//我们可以通过basicQos设定prefetchCount 限制每个消费者在收到下一个确认回执前一次可以最大接受多少条讯息。即如果设定prefetchCount =1,RabbitMQ向这个消费者传送一个讯息后,再这个讯息的消费者对这个讯息进行ack之前,RabbitMQ不会向这个消费者传送新的讯息

channel.basicQos(0,1,true);

} catch (IOException e) {

e.printStackTrace();

}

/**

* 伫列pull讯息API

* @param queue the name of the queue

* @param autoAck 自动确认讯息

* @return a {@link GetResponse} containing the retrieved message data

* @throws java.io.IOException if an error is encountered

*/

/*GetResponse basicGet(String queue, boolean autoAck) throws IOException;*/

try {

//自动获取讯息(pull)下面是Push方式,二选一

GetResponse response = channel.basicGet("test",true);

} catch (IOException e) {

e.printStackTrace();

}

/**

* Start a non-nolocal, non-exclusive consumer.

* @param queue 伫列名称

* @param autoAck 是否自动确认讯息处理

* @param consumerTag 消费者标签

* @param callback an interface to the consumer object 消费者处理

* @return the consumerTag associated with the new consumer

* @throws java.io.IOException if an error is encountered

* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)

*/

/*String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;*/

//不自动确认讯息,下面讯息确认机制篇再详细说明

boolean autoAck = false;

Channel finalChannel = channel;

try {

channel.basicConsume("test", autoAck, "test-consumer-tag",

new DefaultConsumer(finalChannel) {

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body)

throws IOException

{

//释出的每一条讯息都会获得一个唯一的deliveryTag

long deliveryTag = envelope.getDeliveryTag();

// 批量确认

finalChannel.basicAck(deliveryTag, true);

}

});

} catch (IOException e) {

e.printStackTrace();

}

}

//最后关闭channel

if(channel != null){

try {

channel.close();

} catch (Exception e) {

e.printStackTrace();

}

}

//最后关闭conn

if(conn != null){

try {

conn.close();

} catch (IOException e) {

e.printStackTrace();

}

}

做JAVA的一般Spring使用比较多的,我这边本来打算给出Spring的API,但是我觉得Spring的API不利于我们学习,真正掌握rabbitmq的API更重要,这里我给出来,大家一则加深理解,后面更多可以当作API来查询使用。

2019-11-10 09:53:00

相关文章