一、RabbitMQ
首先RabbitMQ是基于AMQP协议模型的一个消息队列项目。主要着重于消息的准确传递。
如果项目所需要的队列是针对吞吐量的队列,如日志收集等,可以用Kafka进行设计。KAFKA追求高吞吐量,对消息的重复、丢失、错误没有严格要求,追求的是性能。
二、RabbitMQ核心概念
Server:又称Broker,接受客户端的连接,实现AMQP实体服务。
Connection:连接,应用程序与Broker的网络连接。
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。
Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容。
Virtual host:虚拟地址(一般用于隔离项目),用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。
Exchange:交换机,接收消息,根据路由转发消息到绑定的队列。
Binding:绑定,Exchange和Queue之间的虚拟连接,binding中可以包含routing key。
Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。
Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
RabbitMQ数据存储方式:RabbitMQ有两种数据存储方式,一种是disk硬盘存储,一种是RAM内存存储。
2.1 交换机属性:
Name:交换机名称
Type:交换机类型direct、topic、fanout、headers
- Driect:所有发送到DirectExchange的消息被转发到RouteKey中指定的Queue。RoutKey必须完全匹配才能发送到指定Queue。
- Topic:所有发送到TopicExchange的消息被转发到所有关心RouteKey中指定Topic的Queue上。(Echange将RouteKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic)
- Fanout:不处理任何路由键,只简单的将队列绑定到交换机上。发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
Durability:是否持久化,True为持久化
AutoDelete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
Arguments:拓展参数,用于拓展AMQP协议自制。
2.2 消息队列属性:
Durability:是否持久化,True为持久化
AutoDelete:当最后一个监听删除后,自动删除该Queue
2.3 消息队列属性:
本质由Properties和Payload(Body)组成
- Delivery mode
- headers(可以自定义属性)
- content_type、content_encoding、priority
- correlation_id、reply_to(消息发送失败后回到哪个队列)、expiration(消息过期时间)、message_id(消息的ID)
2.4 RabbitMQ消息如何流转
- 生产者发送消息到RabbitMQ(指定某个Exchange和Routing key)
- Message发送到某个Exchange
- 根据路由规则,Exchange将消息发送到某一个或几个Message Queue。
- 由监听相应Message Queue的消费者处理消息。
2.5 Confirm确认消息实现
消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。生产者收到应答,来确认消息是否正常的发送到Broker。
如何实现Confirm确认消息
- 在channel上开启确认模式:channel.confirmSelect()
- 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
- 重写addConfirmListener两个方法,handleNack 失败处理和handleAck成功处理
channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } });
2.6 消费端限流
RabbitMQ提供了一种QOS功能,在非自动确认消息的前提下,如果一定数目的消息(通过基于Consume或者channel设置QOS的值)未被确认前,不进行消费新的消息。
- prefetchSize:0
- prefetchCount:一般设置为1,即一旦有1个消息还没有ACK,则该Consumer将block掉,直到有消息ACK
- global:true/false是否将上面设置应用于channel。简单点说,就是上面限制是channel级别的还是consumer级别。
注意:RabbitMQ对perfetchSize没有实现,global这一项对channel也没有实现。
2.7 Return消息机制
如果发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这时候如果需要监听这种不可达的消息,就要使用Return Listener。
channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { System.out.println("handler return"); System.out.println(s); System.out.println(s1); System.out.println(s2); System.out.println(bytes); } });
//第三个消息参数需要设置为true channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());
2.8 ReQueen机制
在消息处理的时候,如果手动签收,返回NACK的话,消息就会重新回到队列重新消费。
2.9 TTL队列/消息
TTL:Time To Live,生存时间。即RabbitMQ可以设置消息的国企时间。
2.10 死信队列
DLX:Dead Letter Exchange。
利用DLX,当消息在一个队列中变成死信的时候,它能被重新publish到另一个Exchange,这个Exchange就是DLX。
死信:
- 消息被拒绝(basic.reject/basic.nack)而且requeue为false。
- 消息TTL过期。
- 队列达到最大长度。
死信队列设置:
设置一个Exchange和Queue,RoutingKey:#。
之后正常使用中,队列加上一个参数
arguments.put(“x-dead-leter-exchange”,”死信Exchange”)
2.11 MQ推拉模式
https://www.cnblogs.com/gordonkong/p/6939046.html
三、RabbitMQ整合SpringBoot
convertAndSend()函数,传参分析:
- String exchange:指定交换机
- String routingKey:路由键
- Object message:发送的对象
- correlationData:消息唯一ID
onOrderMessage()函数,传参分析:
- @Payload Order order:传送的消息体
- @Headers Map<String,Object> headers:Properties各种属性
- Channel channel:通道(由于在配置时选用manual即手工签收引起的,否则不用写)
3.1 消息发送方
@Controller @RequestMapping("/test") public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/test") @ResponseBody public String send(Order order)throws Exception{ CorrelationData correlationData=new CorrelationData(); correlationData.setId(order.getMessageId()); rabbitTemplate.convertAndSend( "order-exchange", "order.abcd", order, correlationData ); return ""; } }
3.2 消息接收方
由于SpringBoot整合了RabbitMQ的相当多功能,所以可以通过注解的形式,来进行相关Listener的注册工作。并且在注解完成后,RabbitMQ可以自动的生成相应的Exchange、Queue及RoutingKey。
@Component public class OrderComsumer { @RabbitListener( bindings = @QueueBinding( value=@Queue(value = "order-queue",durable = "true"), exchange = @Exchange(name = "order-exchange",durable = "true",type = "topic"), key = "order.*" ) ) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("收到消息,开始消费"); System.out.println(order.getId()); System.out.println(order.getName()); Long deliveryTag=(long)headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag,false); } }
四、RabbitMQ消息可靠性传递的方案
第一种方案,大多数厂商应用场景
- 业务数据(BIZ DB)(传送的数据)入库、消息操作记录数据入库(MSG DB)入库。(由于大并发,不要加事务)
- 消息发送给RabbitMQ服务。
- 消息成功后,通知生产端。生产段可以定时拉取没有回复的消息,进行重发。
- 投递成功后在MSG DB相应状态改为成功。
- 定时任务拉取MQ Broker没有回复的数据。
- 对数据进行重传。
- 对消息可以设置最大重传次数,如果超过该次数,将MSG DB相应改为发送失败。
第二种方案,最大限度节省数据信息入库操作
- 生产端发送信息到RabbitMQ,将消息入库、入库完成才发消息。
- 紧接着第一次发送消息,延迟第二次发送消息
- 消费端消费消息
- 发送确认消息,是一条真正的消息,不是ACK
- Callback Service监听确认消息,将消息存入数据库
- 监听延迟发送的队列,五分钟后收到二次发送的消息,检查数据库中相应的数据是否存在,存在则正常。
- 若不正常,发起RPC通信,通知发送端重新发送。
五、RabbitMQ集群
5.1 主备模式
实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模型非常的好用且简单。主备模式也成为Warren模式。(主节点挂掉后,从节点提供服务)。
5.2 远程模式
可以实现双活的一种模式,简称Shovel模式,所谓Shovel就是我们可以把消息进行不同数据中心的复制工作,我们可以跨地域的让两个MQ集群互联。远距离的通信和复制,所谓Shovel就是可以把消息进行不同数据中心的复制工作,可以跨地域的让两个MQ集群互联。
5.3 镜像模式
非常经典的Mirror镜像模式,保证100%数据不丢失。为了保证RabbitMQ数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲2-3个节点实现数据同步(对于100%可靠性解决方案一般是3个节点)
5.4 多活模式
实现异地数据复制的主流模式,Shovel模式配置比较复杂,所以一般来说实现异地集群都是使用双活或者多活模型来实现。该模式以来RabbitMQ的federation插件,可以实现持续的可靠的AMQP数据通信。实现该模式,在多套数据中心各部署一套RabbitMQ集群,各中心的RabbitMQ服务除了需要为业务提供正常的消息服务外,中心之间还需实现部分消息共享。
六、实际应用中的一些消息场景
6.1 迅速消息发送场景
迅速消息指不进行落库存储,不做可靠性的保障。在非核心消息、日志数据、或统计分析等场景下比较合适。优点是性能最高,吞吐连最大
6.2 确认消息发送
6.3 批量消息发送
把消息放到一个集合里统一进行提交,期望在一个会话中,比如投掷到ThreadLocal里的集合,然后拥有相同会话ID,并且带有这次提交信息的size等相关属性。最重要的一点是要把这批消息进行合并,对于Channel而言,就是发送一次消息。这种方式也是在消费的时候,可以批量进行消费。但不保证可靠性,需要进行补偿机制。
6.4 延迟消息发送
就是在Message封装的时候添加delayTime属性
6.5 顺序消息
- 发送的顺序消息,必须保障消息投递到同一个队列,且这个队列只能有一个消费者。
- 之后需要统一提交,并且所有消息的会话ID一致。
- 添加消息属性:顺序标记的序号、和本次顺序消息的Size属性,进行落库操作。
- 并行进行发送给自身的延迟消息(注意带上关键属性:会话ID,size)进行后续处理消费。
- 当收到延迟消息后,根据会话ID,size抽取数据库进行处理即可。
- 定时轮询笔畅机制,对于异常情况。
6.6 消息的幂等性
可能导致消息出现非幂等:
- 可靠性消息投递机制
- MQ Broker服务与消费端传输过程中的网络抖动
- 消费端故障异常