APP下载

分散式架构核心元件之讯息伫列RabbitMQ

消息来源:baojiabao.com 作者: 发布时间:2024-05-19

报价宝综合消息分散式架构核心元件之讯息伫列RabbitMQ

一、为什么使用讯息伫列

1、实现解耦

耦合指不同模组/系统之间相互作用、相互依赖的关系。RabbitMQ以异步的方式解耦系统间的关系,使用者将业务请求传送到Rabbit服务器,然后就可以返回了,Rabbit会确保请求被正确处理,即使遇到网络异常、Rabbit服务器崩溃等特殊场景。针对这些特殊场景,Rabbit提供了各种机制确保其可用性。

2、实现异步

对于大批量的使用者操作,同步等待需要很长时间,利用MQ的释出订阅模型,异步处理响应结果可以避免使用者不必要的等待。

举一个简单MQ使用场景:使用手机APP实时转账,就采用了MQ低延迟特性,将转账讯息投递到MQ中,然后由消费者(对方银行)进行处理,具体到账时间以对方银行收到转账讯息,处理完转账操作时间为准。

3、流量消峰

对于像电商网站,每逢店庆、双11等节日会出现瞬间访问量特别大的情况。不能因为系统支援的并发量有限而拒绝部分使用者请求,通常做法是承接所有使用者请求转变为讯息存放到MQ中,由后端业务系统按照顺序进行处理。借助MQ低延迟高吞吐量特性,可以建立多消费者实现负载,从而实现流量消峰。

另外可以利用MQ释出订阅模型可以实现讯息广播。

二、RabbitMQ工作模型

RabbitMQ中生产者传送讯息、消费者接收讯息都需要与Broker建立连线Connection(TCP长连线),为了减少Broker效能损耗,RabbitMQ引入了讯息通道/通道Channel,Channel是在应用程序与Broker的Connection连线中建立的虚拟连线,在传送接收讯息时直接操作Channel即可,不需要频繁与Broker建立连线。另外Channel还是最重要的程式设计界面,对交换机Exchange、伫列Queue操作的很多API都是对Channel方法的封装。

RabbitMQ讯息投递引用了交换机Exchange,当一个讯息携带了地址(Routing Key)时,交换机会根据携带的路由关键字传送到不同伫列Queue。交换机Exchange和伫列Queue的对应关系需要进行事先系结Binding。伫列Queue和交换机是多对多的关系,交换机可以系结多个伫列Queue,伫列Queue也可以系结到不同交换机。

同时为了节省服务器资源,RabbitMQ引入了Virtual Host虚拟机器机制,我们可以把Virtual Host虚拟机器当做一个小型的RabbitMQ服务器。在同一个Broker中可以建立多个Virtual Host虚拟机器,每个Virtual Host虚拟机器中都能建立自己的交换机Exchange和伫列Queue,以及它们的系结关系。从而实现了硬件服务器的高效利用以及资源的隔离。

RabbitMQ中核心概念:

1、Server:又称Broker,接收客户端的连线,实现AMQP实体服务。

2、Connection:应用程序与Broker的网络连线。

3、Channel:网络通道,几乎所有的操作都在Channel中进行,包括定义Queue、定义Exchange、系结Queue与Exchange、释出讯息等。Channel是进行讯息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。

4、Message:服务器和应用程序之间传送的讯息,由Properties和Body组成。Properties可以对讯息进行修饰,比如讯息的优先级、延迟等高阶特性;Body就是讯息体内容。

5、Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的讯息路由。一个Virtual Host可以有若干个Exchange和Queue,同一个Virtual host里面不能有相同的Exchange和Queue。

6、Exchange:接收讯息的交换机,根据路由关键字转发讯息到系结的伫列。RabbitMQ中有三种常用的交换机型别:

6.1、直连交换机direct:通过Binding关键字将Queue伫列和直连交换机建立连线。伫列Queue系结直连交换机时,必须指定一个关键字。传送讯息时会携带一个路由关键字,RabbitMQ会将路由关键字和系结关键字进行一个全字元匹配。

channel.basicPublish("My_Direct_Exchange","spring","hello world");

channel.basicPublish("My_Direct_Exchange","struts","hi struts");

6.2、主题交换机topic:系结关键字时,不需要全字元匹配,可以采用万用字元。

* 代表一个单词

# 代表零个或多个单词

channel.basicPublish("My_Direct_Exchange","junior.netty","hello world");

6.3、广播交换机fanout:不再需要任何系结关键字,交换机会将讯息分发到所有伫列Queue。

7、Binding:Exchange和Queue之间的虚拟连线,binding中可以包含routing key。

8、Routing key:一个路由规则,虚拟机器可用它来确定如何路由一个特定讯息。

9、Queue:也称为Message Queue讯息伫列,储存讯息并将它们转发给消费者,多个消费者可以订阅同一个Queue,这时Queue中的讯息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的讯息并处理。

10、Prefetch count:如果有多个消费者同时订阅同一个Queue中的讯息,Queue中的讯息会被平摊给多个消费者。这时如果每个讯息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设定prefetchCount来限制Queue每次传送给每个消费者的讯息数,比如我们设定prefetchCount=1,则Queue每次给每个消费者传送一条讯息;消费者处理完这条讯息后Queue会再给该消费者传送一条讯息。

三、Spring Boot整合RabbitMQ

RabbitMQ安装成功后登入RabbitMQ控制台http://192.168.1.112:15672/,设定Exchange、Queue、Routing Key。Spring Boot整合RabbitMQ步骤:

1、引入相关依赖

org.springframework.boot

spring-boot-starter-amqp

2、配置application.yml

## producer配置

spring:

rabbitmq:

addresses: 192.168.1.110:5672

username: wangpf

password: 111111

virtual-host: /

server:

port: 8001

servlet:

context-path: /

## consumer配置

spring:

rabbitmq:

addresses: 192.168.1.110:5672

username: guest

password: guest

virtual-host: /

listener:

simple:

concurrency: 5

acknowledge-mode: manual

max-concurrency: 10

prefetch: 1

server:

port: 8002

servlet:

context-path: /

3、例项程式码

生产者RabbitSender

@Component

public class RabbitSender {

//自动注入RabbitTemplate模板类

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private BrokerMessageLogMapper brokerMessageLogMapper;

//回拨函式: confirm确认

final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.err.println("correlationData: " + correlationData);

String messageId = correlationData.getId();

if(ack){

//如果confirm返回成功 则进行更新

brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());

} else {

//失败则进行具体的后续操作:重试 或者补偿等手段

System.err.println("异常处理...");

}

}

};

//传送讯息方法呼叫: 构建自定义物件讯息

public void sendOrder(Order order) throws Exception {

// 通过实现 ConfirmCallback 界面,讯息传送到 Broker 后触发回拨,确认讯息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中

rabbitTemplate.setConfirmCallback(confirmCallback);

//讯息唯一ID

CorrelationData correlationData = new CorrelationData(order.getMessageId());

rabbitTemplate.convertAndSend("order-exchange", "order.msg", order, correlationData);

}

}

消费者RabbitReceiver

@Component

public class RabbitReceiver {

//配置监听的哪一个伫列,同时在没有queue和exchange的情况下会去建立并建立系结关系

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "order-queue",durable = "true"),

exchange = @Exchange(name="order-exchange",durable = "true",type = "topic"),

key = "order.*"

)

)

//如果有讯息过来,在消费的时候呼叫这个方法

@RabbitHandler

public void onOrderMessage(@Payload Order order, @Headers Map headers, Channel channel) throws IOException {

//消费者操作

System.out.println("---------收到讯息,开始消费---------");

/**

* Delivery Tag 用来标识通道中投递的讯息。RabbitMQ 推送讯息给 Consumer 时,会附带一个 Delivery Tag,

* 以便 Consumer 可以在讯息确认时告诉 RabbitMQ 到底是哪条讯息被确认了。

* RabbitMQ 保证在每个通道中,每条讯息的 Delivery Tag 从 1 开始递增。

*/

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

/**

* multiple 取值为 false 时,表示通知 RabbitMQ 当前讯息被确认

* 如果为 true,则额外将比第一个引数指定的 delivery tag 小的讯息一并确认

*/

boolean multiple = false;

//ACK,确认一条讯息已经被消费

channel.basicAck(deliveryTag,multiple);

}

}

2019-01-23 08:39:00

相关文章