消息队列:RabbitMQ概述

RabbitMQ

RabbitMQ是一个消息代理。它的核心思想非常简单:接收并转发消息。你可以把它想象成一个邮局:当你把邮件丢进邮箱时,你非常确定邮递员先生会把它送到收件人手中。在这个比喻中,RabbitMQ就是邮箱、邮局和邮递员。

概述

是什么

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

应用

适用性

  • 用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量还在其次。

概念

img

通常我们谈到队列服务, 会有三个概念:发消息者、队列、收消息者,RabbitMQ在这个基本概念之上,多做了一层抽象,在发消息者和队列之间,加入了交换器 (Exchange)。这样发消息者和队列就没有直接联系,转而变成发消息者把消息给交换器,交换器根据调度策略再把消息再给队列。

上图中有几个重要概念:

  • Broker:简单来说就是消息队列服务器实体。
  • Producer:消息生产者,就是投递消息的程序。
  • Consumer:消息消费者,就是接受消息的程序。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    • Exchange用于转发消息,但是它不会做存储 ,如果没有Queue bind到Exchange的话,它会直接丢弃掉Producer发送过来的消息。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来。
    • 也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
  • Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
    • 消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    • 一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

Broker

消息中间件的服务节点。一个Broker可以简单看作一个RabbitMQ服务节点或RabbitMQ服务实例。

Producer

消息的生产者。生产者创建消息,然后发布到RabbitMQ当中,消息一般可以包含2个部分:消息体(payload)和标签(label)。

  • payload。一般是一个带有业务逻辑结构的数据,例如一个JSON字符串。
  • label。用于表述这条消息,例如一个交换器名称和一个路由键,RabbitMQ根据标签将消息发送到感兴趣的Consumer。

Consumer

消费者即接收消息的一方,消费者连接到RabbitMQ服务器,并订阅到队列上,当消费者消费一条消息时,只是消费消息的消息体。

在消息路由的过程中,消息的标签被丢弃,存入到队列中的消息只有消息体,消费者也只会消费消息体,即不知道消息的生产者是谁。

Exchange、Routing Key、Binding

Exchange:当生产者期望将消息投递到队列中,消息首先会被发送到Exchange当中,由交换器将消息路由到一个或多个队列中,如果路由不到,则可能返回给生产者或丢弃。

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。

Exchange 参数

1
2
3
4
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,boolean durable,
boolean autoDelete,boolean internal,
Map<String,Object> arguments)throws IOException;
  • exchange:交换器名称。
  • type:Exchange Type。
  • durable:是否持久化,持久化可将交换器存盘,在服务器重启时也不会丢失相关信息。
  • autoDelete:是否自动删除的,自动删除的前提是至少有应该队列或者交换器与该交换器绑定,之后所有与这个交换器绑定的都与该交换器解绑。
  • internal:是否内置的,对于内置的交换器,客户端无法直接发送消息到这个交换器中。
  • argument:其他的一些结构化的参数。

有其他重载的方法例如exchangeDeclareNoWait()方法多设置了一个nowait参数,指不需要服务器返回。而常规方法在声明交换器后需要等待服务器返回Exchange.Declare-Ok。

Routing Key:路由键,生产者将消息发送到Exchange时,一般会指定一个Routing Key,用于指定消息的路由规则。Routing Key需要与Exchange Type和Binding联合使用才有效。

Binding:绑定,RabbitMQ通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就知道如何正确地将消息路由到队列中。

Exchange Bind参数

1
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

Exchange Type

交换机有四种类型:Direct,Topic,Headers and Fanout:

  • Direct:direct类型的行为是”先匹配,再投送”。即在绑定时设定一个routing key, 消息的routing key匹配时, 才会被交换器投送到绑定的队列中去。
  • Topic:按规则转发消息(最灵活)。
  • Headers:设置header attribute参数类型的交换机。
  • Fanout:转发消息到所有绑定队列。

Direct Exchange

Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

img

第一个X - Q1就有一个binding key,名字为orange;X - Q2就有2个binding key,名字为black和green。当消息中的routing Key和这个binding key对应上的时候,那么就知道了该消息去到哪一个队列中。

Ps:为什么X到Q2要有black,green2个binding key呢,一个不就行了吗?

这个主要是因为可能又有Q3,而Q3只接受black的信息,而Q2不仅接受black的信息,还接受green的信息。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说agreements.us,或者 agreements.eu.stockholm等。
  • 路由模式必须包含一个星号(*),主要用于匹配路由键指定位置的一个单词
    • 一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。
  • 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。

具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下:

1
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

topic和direct类似, 只是匹配上支持了”模式”, 在”点分”的routing key形式中,可以使用两个通配符:

  • *表示一个词。
  • #表示零个或多个词。

Headers Exchange

headers也是根据规则匹配,相较于direct和topic固定地使用routing key,headers则是一个自定义匹配规则的类型,在队列与交换器绑定时,会设定一组键值对规则,消息中也包括一组键值对(headers属性),当这些键值对有一对,或全部匹配时,消息被投送到对应队列。

Fanout Exchange

Fanout Exchange消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing key会被忽略。

Queue

Queue是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息都只能存储在队列中

多个Consumer可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者处理,而不是每个消费者都收到所有的消息并处理。

RabbitMQ不支持队列层面的广播消费,如果需要则需要二次开发,并且处理逻辑会更加复杂。

queueDeclare 参数

  • quque:队列的名称。
  • durable:是否持久化,持久化的队列会存盘。
  • exclusive:是否排他,若是则只对首次声明它的Connection可见,注意不是Channel,并在连接断开时自动删除。
    • 适用于一个客户端同时发送和读取消息的场景。
  • autoDelete:是否自动删除的。
  • arguments:设置其他的一些参数。

queueBind 参数

1
Queue.BindOk queueBind(String queue,String exchange,String routingKey,Map<String,Object> arguments);
  • queue。队列名称。
  • exchange。交换器名称
  • routingKey。用来绑定队列和交换器的路由键。
  • argument。定义绑定的一些参数。

Connection、Channel

Producer与Consumer都需要与Broker建立连接,即一个TCP连接,即Connection。

当TCP连接建立后,客户端紧接着可以创建一个AMQP Channel,每个Channel会被指派一个唯一ID,Channel是建立在Connection上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过Channel完成的。

1567950699481

Channel特点

  • Channel使用了NIO,对于单点,只需要建立一个TCP连接即可,减少了性能开销,便于管理。
  • Channel保证了每个线程的私密性,如同独立的连接一样,当Channel的流量不是很大时,可以有效节省TCP连接资源。
  • 当Channel本身流量很大,此时使用一个Connection可能导致性能瓶颈,此时应该开辟多个Connection。
  • 如果公用Channel,则可能导致网络上出现错误的通信帧交错,也会影响发送方Confirm机制的运行。即Channel非线程安全。

Channel模式

一个Channel只能承接一个会话任务,因此一个Channel不能既负责消费又负责生产。

  • 传输模式:
  • 订阅模式:

协作

Producer将消息存入Broker,Consumer从Broker中消费数据的整个流程:

1567944190719

  • Producer将业务方数据进行包装,封装为消息,发送(在AMQP中是Basic.Publish)到Broker中。
  • Consumer订阅并接收消息(Basic.Consume或Basic.Get),经过解包得到原始数据。
  • Consumer进行业务处理逻辑。

RabbitMQ运转流程

生产者发送消息:

  • Producer连接到Broker,建立一个Connection,开启一个信道Channel。
  • Producer声明一个Exchange,并设置相关的属性,例如交换机类型、释放持久化等。
  • Producer声明一个Queue并设置相关属性,比如是否排他、是否持久化、是否自动删除等。
  • Producer通过路由键将交换器和队列绑定起来。
  • Producer发送消息至Broker,其中包含Routing Key、Exchange等信息。
  • 相应的Exchange根据接收到的Routing Key查找相匹配的Queue。
    • 如果找到,则将从Producer发送过来的消息存入相应的Queue。
    • 如果没有找到,则根据Producer配置的属性选择丢弃还是回退给Producer。
  • 关闭Channel。
  • 关闭Connection。

Consumer接收消息:

  • Consumer连接到Broker,建立一个Connection,开启一个Channel。
  • Consumer向Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
  • 等待Broker回应并投递相应队列中的消息,Consumer接收消息。
  • Consumer ack接收到的消息。
  • RabbitMQ从队列中删除相应已经被确认的消息。
  • 关闭Channel。
  • 关闭Connection。

权衡

优点

  • 可靠性:RabbitMQ 使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
  • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  • 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  • 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
  • 多语言客户端:RabbitMQ几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。
  • 管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
  • 插件机制: RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

缺陷

实现

何时创建队列

RabbitMQ当中队列是真正耗费服务器性能的地方,因此衡量RabbitMQ的QPS只需要看队列即可。实际应用中,需要对创建的队列的流量、内存占用、网卡占用有一个清晰的认知,预估其均值与峰值,以便在固定硬件资源的情况下能够进行合理的分配。

  • 生产者与消费者创建。即在使用队列时,生产者与消费者都应该尝试创建(声明操作)队列。
  • 预创建队列。如果设计初已经充分预估了队列的使用情况,则完全可以在业务上线前预先在服务器上创建好。
    • 预创建资源可以确保交换器与队列间正确地绑定匹配。
    • 预先创建可以对业务代码无干扰,方便迁移。

发送消息

1
void basicPublish(String exchange,String routingKey,boolean mandatory,boolean immediate,BasicProperties props,byte[] body)throws IOException
  • exchange:交换器的名称,指明消息需要发送到哪个交换器中。
    • 如果设置为空则消息会发送到RabbitMQ默认的交换器中。
  • routingKey:路由键,交换器根据routingKey将消息存储到相应的队列中。
  • props:消息的基本属性集。
    • contentType。
    • contentEncoding。
    • headers(Map<String,Object>)。
    • deliveryMode。
    • priority。
    • correlationId。
    • replyTo。
    • expiration。
    • messageId。
    • timestamp。
    • type。
    • userId。
    • appId。
    • clusterId。
  • byte[] body:消息体。

消费消息

消费消息有两种模式:

  • 推模式。Basic.Consume。
    • 适用于批量消费,数量限制于Basic.Qos。
  • 拉模式。Basic.Get。
    • 适用于单条消费。

推模式,当调用与Consumer相关的API方法时,不同的订阅采用不同的consumerTag来区分彼此,同一个Channel中的消费者也需要通过唯一的consumerTag以作区分。

1
String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) throws IOException;
  • queue:队列名称。
  • autoAck:设置是否自动确认。
  • consumerTag:消费者标签,用于区分多个消费者。
  • noLocal:设置为true则表示不能将同一个Connection中Producer发送的消息传送给这个Connection中的Consumer。
  • exclusive:设置是否排他。
  • arguments:设置消费者的其他参数。
  • callback:设置消费者的回调函数,用来处理RabbitMQ推送过来的消息。

basic.Consume将Channel置为接收模式,直到取消队列的订阅为止,在接收模式中,MQ会不断推送消息给消费者,但是数量收到Basic.Qos限制。

拉模式,通过channel.basicGet可以单条地获取消息,返回值是GetReponse。

1
GetReponse basicGet(String queue,boolean autoAck) throws IOException
  • queue:队列名称。
  • autoAck:是否自动确认。

AMQP

AMQP解决了什么问题,或者说它的应用场景是什么?

对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:

  1. 信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?
  2. 如何降低发送者和接收者的耦合度?
  3. 如何让Priority高的接收者先接到数据?
  4. 如何做到load balance?有效均衡接收者的负载?
  5. 如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。
  6. 如何做到可扩展,甚至将这个通信模块发到cluster上?
  7. 如何保证接收者接收到了完整,正确的数据?

AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。

概述

AMQP协议本身包括三层:

  • Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。
    • 例如Basic.Consume,Queue.Declare。
  • Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务器的应答返回给客户端,主要为客户端与服务器间的通信提供可靠性同步机制和错误处理。
  • Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误监测和数据表示等。

从计算机网络方面,AMQP是应用层协议,其填充TCP协议的数据部分。

AMQP通过协议命令进行交互的,AMQP协议可以看作一系列结构化命令的集合。可类比HTTP方法,如GET、POST。

命令

名称 内容体 客户端方法 描述
Connection.Start factory.newConnection 建立连接相关
Connection.Close connection.close 关闭连接
Channel.Open connection.openChannel 开启信道
Channel.Close channel.close 关闭信道
Exchange.Declare channel.exchangeDeclare 声明交换器
Exchange.Delete 删除交换器
Exchange.Bind 交换器与交换器绑定
Exchange.UnBind 交换器与交换器解绑
Queue.Declare 声明队列
Queue.Bind 队列与交换器绑定
Queue.Purge 清除队列中的内容
Queue.Delete 删除队列
Queue.Unbind 队列与交换器解绑
Basic.Qos 设置未被确认消费的个数
Basic.Consume 消费消息,推模式
Basic.Cancel 取消
Basic.Publish 发送消息
Basic.Return 未能成功路由的消息返回
Basic.Deliver Broker推送消息模式
Basic.Get 消费消息,拉模式
Basic.Ack 确认
Basic.Reject 单条拒绝
Basic.Recover 请求Broker重新发送未被确认的消息
Basic.Nack 拒绝,可批量化
Tx.Select 事务开启
Tx.Commit 事务提交
Tx.Rollback 事务回滚
Confirm.Select 开启发送端确认模式

实现

AMQP Producer流转过程

1
2
3
4
5
6
7
8
9
Connection connection = factory .newConnection() ; //创建连接 
Channel channel = connection.createChannel() ; //创建信道
String message = "Hello World! ";
channel.basicPublish(EXCHANGE NAME , ROUTING KEY ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//关闭资源
channel.close() ;
connection .close();
  • 当客户端希望与Broker建立连接时,会调用factory .newConnection(),方法会进一步封装未Protocol Header 0-9-1的报文头发送给Broker,以通知Broker本次交互采用的时AMQP 0-9-1协议。
  • 客户端调用connection.createChannel(),其包装Channel.Open命令发送给Broker,等待Channel.Open-Ok
  • 客户端发送消息需要调用channel.basicPublish(),对应Basic.Pubilsh,命令内部包含Content Header和Content Body。
    • Content Header包含消息体的属性,例如投递模式、优先级等。
  • 客户端关闭资源。

1567956900902

AMQP Consumer流转过程

1
2
3
4
5
6
7
8
Connection connection = factory.newConnection(addresses);//创建连接 
final Channel channel = connection.createChannel() ; //创建信道
Consumer consumer = new DefaultConsumer(channel) ();//_省略实现
channel .basicQos(64) ;
channel.basicConsume(QUEUE NAME , consumer) ; //等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5) ;
channel.close();
connection.close();
  • 如果在消费前调用了channel.basicQos(int prefetchCount)设置消费者客户端最大能保持的未确认消息数,则会涉及Basic.Qos等命令。
  • 当真正消费前,客户端需要向Broker发送Basic.Consume将Channel设置未接收模式。
  • Broker回执Basic.Consume-Ok告诉Consumer准备好消费消息。
  • Broker向Consumer Push消息,即Basic.Deliver,该消息会携带Content Header与Content Body。
  • 消费者接收到消息并正确消费后,向Broker发送确认,即Basic.Ack
  • 停止消费,关闭连接。

1567957371388

参考

  1. RabbitMQ教程
  2. 消费者致谢和生产者确认