RabbitMQ 核心概念及简单教程

一、RabbitMQ

首先RabbitMQ是基于AMQP协议模型的一个消息队列项目。主要着重于消息的准确传递。

如果项目所需要的队列是针对吞吐量的队列,如日志收集等,可以用Kafka进行设计。KAFKA追求高吞吐量,对消息的重复、丢失、错误没有严格要求,追求的是性能。

二、RabbitMQ核心概念

Server:又称Broker,接受客户端的连接,实现AMQP实体服务。

Connection:连接,应用程序与Broker的网络连接。

Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。

Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容。

Virtual host:虚拟地址(一般用于隔离项目),用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。

Exchange:交换机,接收消息,根据路由转发消息到绑定的队列。

Binding:绑定,Exchange和Queue之间的虚拟连接,binding中可以包含routing key。

Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。

Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

RabbitMQ数据存储方式:RabbitMQ有两种数据存储方式,一种是disk硬盘存储,一种是RAM内存存储。


2.1 交换机属性:

Name:交换机名称

Type:交换机类型direct、topic、fanout、headers

  • Driect:所有发送到DirectExchange的消息被转发到RouteKey中指定的Queue。RoutKey必须完全匹配才能发送到指定Queue。
  • Topic:所有发送到TopicExchange的消息被转发到所有关心RouteKey中指定Topic的Queue上。(Echange将RouteKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic)
  • Fanout:不处理任何路由键,只简单的将队列绑定到交换机上。发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。

Durability:是否持久化,True为持久化

AutoDelete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange

Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False

Arguments:拓展参数,用于拓展AMQP协议自制。


2.2 消息队列属性:

Durability:是否持久化,True为持久化

AutoDelete:当最后一个监听删除后,自动删除该Queue


2.3 消息队列属性:

本质由Properties和Payload(Body)组成

  • Delivery mode
  • headers(可以自定义属性)
  • content_type、content_encoding、priority
  • correlation_id、reply_to(消息发送失败后回到哪个队列)、expiration(消息过期时间)、message_id(消息的ID)

2.4 RabbitMQ消息如何流转

  • 生产者发送消息到RabbitMQ(指定某个Exchange和Routing key)
  • Message发送到某个Exchange
  • 根据路由规则,Exchange将消息发送到某一个或几个Message Queue。
  • 由监听相应Message Queue的消费者处理消息。


2.5 Confirm确认消息实现

消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。生产者收到应答,来确认消息是否正常的发送到Broker。

如何实现Confirm确认消息

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
  3. 重写addConfirmListener两个方法,handleNack 失败处理和handleAck成功处理
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------no ack!-----------");
    }

    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------ack!-----------");
    }
});

2.6 消费端限流

RabbitMQ提供了一种QOS功能,在非自动确认消息的前提下,如果一定数目的消息(通过基于Consume或者channel设置QOS的值)未被确认前,不进行消费新的消息。

  • prefetchSize:0
  • prefetchCount:一般设置为1,即一旦有1个消息还没有ACK,则该Consumer将block掉,直到有消息ACK
  • global:true/false是否将上面设置应用于channel。简单点说,就是上面限制是channel级别的还是consumer级别。

注意:RabbitMQ对perfetchSize没有实现,global这一项对channel也没有实现。


2.7 Return消息机制

如果发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这时候如果需要监听这种不可达的消息,就要使用Return Listener。

channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
        System.out.println("handler return");
        System.out.println(s);
        System.out.println(s1);
        System.out.println(s2);
        System.out.println(bytes);
    }
});

//第三个消息参数需要设置为true
channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());

2.8 ReQueen机制

在消息处理的时候,如果手动签收,返回NACK的话,消息就会重新回到队列重新消费。


2.9 TTL队列/消息

TTL:Time To Live,生存时间。即RabbitMQ可以设置消息的国企时间。


2.10 死信队列

DLX:Dead Letter Exchange。

利用DLX,当消息在一个队列中变成死信的时候,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

死信:

  1. 消息被拒绝(basic.reject/basic.nack)而且requeue为false。
  2. 消息TTL过期。
  3. 队列达到最大长度。

死信队列设置:

设置一个Exchange和Queue,RoutingKey:#。

之后正常使用中,队列加上一个参数

arguments.put(“x-dead-leter-exchange”,”死信Exchange”)

2.11 MQ推拉模式

https://www.cnblogs.com/gordonkong/p/6939046.html


三、RabbitMQ整合SpringBoot

convertAndSend()函数,传参分析:

  • String exchange:指定交换机
  • String routingKey:路由键
  • Object message:发送的对象
  • correlationData:消息唯一ID

onOrderMessage()函数,传参分析:

  • @Payload Order order:传送的消息体
  • @Headers Map<String,Object> headers:Properties各种属性
  • Channel channel:通道(由于在配置时选用manual即手工签收引起的,否则不用写)

3.1 消息发送方

@Controller
@RequestMapping("/test")
public class OrderSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/test")
    @ResponseBody
    public String send(Order order)throws Exception{
        CorrelationData correlationData=new CorrelationData();
        correlationData.setId(order.getMessageId());
        rabbitTemplate.convertAndSend(
                "order-exchange",
                "order.abcd",
                order,
                correlationData
        );
        return "";
    }
}

3.2 消息接收方

由于SpringBoot整合了RabbitMQ的相当多功能,所以可以通过注解的形式,来进行相关Listener的注册工作。并且在注解完成后,RabbitMQ可以自动的生成相应的Exchange、Queue及RoutingKey。

@Component
public class OrderComsumer {
    @RabbitListener(
            bindings = @QueueBinding(
                    value=@Queue(value = "order-queue",durable = "true"),
                    exchange = @Exchange(name = "order-exchange",durable = "true",type = "topic"),
                    key = "order.*"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel)throws  Exception{
        System.out.println("收到消息,开始消费");
        System.out.println(order.getId());
        System.out.println(order.getName());
        Long deliveryTag=(long)headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag,false);
    }

}

四、RabbitMQ消息可靠性传递的方案

第一种方案,大多数厂商应用场景

  1. 业务数据(BIZ DB)(传送的数据)入库、消息操作记录数据入库(MSG DB)入库。(由于大并发,不要加事务)
  2. 消息发送给RabbitMQ服务。
  3. 消息成功后,通知生产端。生产段可以定时拉取没有回复的消息,进行重发。
  4. 投递成功后在MSG DB相应状态改为成功。
  5. 定时任务拉取MQ Broker没有回复的数据。
  6. 对数据进行重传。
  7. 对消息可以设置最大重传次数,如果超过该次数,将MSG DB相应改为发送失败。

第二种方案,最大限度节省数据信息入库操作

  1. 生产端发送信息到RabbitMQ,将消息入库、入库完成才发消息。
  2. 紧接着第一次发送消息,延迟第二次发送消息
  3. 消费端消费消息
  4. 发送确认消息,是一条真正的消息,不是ACK
  5. Callback Service监听确认消息,将消息存入数据库
  6. 监听延迟发送的队列,五分钟后收到二次发送的消息,检查数据库中相应的数据是否存在,存在则正常。
  7. 若不正常,发起RPC通信,通知发送端重新发送。

五、RabbitMQ集群

5.1 主备模式

实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模型非常的好用且简单。主备模式也成为Warren模式。(主节点挂掉后,从节点提供服务)。

5.2 远程模式

可以实现双活的一种模式,简称Shovel模式,所谓Shovel就是我们可以把消息进行不同数据中心的复制工作,我们可以跨地域的让两个MQ集群互联。远距离的通信和复制,所谓Shovel就是可以把消息进行不同数据中心的复制工作,可以跨地域的让两个MQ集群互联。

5.3 镜像模式

非常经典的Mirror镜像模式,保证100%数据不丢失。为了保证RabbitMQ数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲2-3个节点实现数据同步(对于100%可靠性解决方案一般是3个节点)

5.4 多活模式

实现异地数据复制的主流模式,Shovel模式配置比较复杂,所以一般来说实现异地集群都是使用双活或者多活模型来实现。该模式以来RabbitMQ的federation插件,可以实现持续的可靠的AMQP数据通信。实现该模式,在多套数据中心各部署一套RabbitMQ集群,各中心的RabbitMQ服务除了需要为业务提供正常的消息服务外,中心之间还需实现部分消息共享。

六、实际应用中的一些消息场景

6.1 迅速消息发送场景

迅速消息指不进行落库存储,不做可靠性的保障。在非核心消息、日志数据、或统计分析等场景下比较合适。优点是性能最高,吞吐连最大

6.2 确认消息发送

之前的——RabbitMQ消息可靠性传递的方案

6.3 批量消息发送

把消息放到一个集合里统一进行提交,期望在一个会话中,比如投掷到ThreadLocal里的集合,然后拥有相同会话ID,并且带有这次提交信息的size等相关属性。最重要的一点是要把这批消息进行合并,对于Channel而言,就是发送一次消息。这种方式也是在消费的时候,可以批量进行消费。但不保证可靠性,需要进行补偿机制。

6.4 延迟消息发送

就是在Message封装的时候添加delayTime属性

6.5 顺序消息

  1. 发送的顺序消息,必须保障消息投递到同一个队列,且这个队列只能有一个消费者。
  2. 之后需要统一提交,并且所有消息的会话ID一致。
  3. 添加消息属性:顺序标记的序号、和本次顺序消息的Size属性,进行落库操作。
  4. 并行进行发送给自身的延迟消息(注意带上关键属性:会话ID,size)进行后续处理消费。
  5. 当收到延迟消息后,根据会话ID,size抽取数据库进行处理即可。
  6. 定时轮询笔畅机制,对于异常情况。

6.6 消息的幂等性

可能导致消息出现非幂等:

  1. 可靠性消息投递机制
  2. MQ Broker服务与消费端传输过程中的网络抖动
  3. 消费端故障异常