消息队列:概述

消息队列介绍

消息队列特性

  • 业务无关,只做消息分发
  • FIFO先投递先到达
  • 容灾:节点的动态增删和消息的持久化
  • 性能:吞吐量提升,系统内部通信效率会提高

概述

是什么

消息是指在应用间传递数据,可以是简单的字符串、JSON等,也可以是内嵌对象。

MQ是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

分类,各个分类是什么

  • RabbitMQ。RabbitMQ是采用Erlang语言实现AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,
  • Kafka。
  • ActiveMQ。
  • RocketMQ。

MQ的传递模式

  • 点对点P2P模式:
    • 基于队列,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能。
  • 发布订阅模式:
    • 定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic,主题可以任务是消息传递的中介。
    • 消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。
    • 主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,其在消息的一对多广播时采用。

详细对比

1567999190890

1567999225460

1567999242072

1567999268002

应用

适用性

  • 提供了以松散耦合的灵活方式集成应用程序的一种机制。
    • 提供了基于存储和转发的应用程序间异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。
    • 提供了有保障的消息发送,无需了解RPC和网络通信协议细节。
  • 需要可靠的数据传送的分布式环境。
  • 解耦。提供了以松散耦合的灵活方式集成应用程序的一种机制。
    • 应用程序只关注自己系统的核心流程。
    • 提供了基于存储和转发的应用程序间异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。
    • 提供了有保障的消息发送,无需了解RPC和网络通信协议细节。
  • 扩展性。
    • 因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
  • 冗余存储。某些情况下处理数据的过程会失败,消息中间件可以把数据持久化直到它们以及被完全处理。
    • 规避了数据丢失风险,如果要删除消息,需要你的处理系统明确指出该消息以及被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
  • 削峰。在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。
    • 如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩惯。
  • 可恢复性。当系统一部分组件失效时,不会影响到整个系统。
    • 消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
  • 顺序保证。
    • 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
  • 缓冲。在任何重要的系统中,都会存在需要不同处理时间的元素。
    • 消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制 和优化数据流经过系统的速度。
  • 异步通信。
    • 在很多时候应用不想也不需要立即处理消息。 消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。
  • 最终一致性。
    • 两个系统的状态保持一致,利用记录和补偿实现。

缺陷

应用场景

异步处理

场景说明:用户注册后,需要发送注册邮件和发送注册信息,传统的做法有两种:串行方式、并行方式

串行方式

将注册信息写入数据库成功后,发送注册邮件,然后发送注册短信,而所有任务执行完成后,返回信息给客户端

img

并行方式

将注册信息写入数据库成功后,同时进行发送注册邮件和发送注册短信的操作。而所有任务执行完成后,返回信息给客户端。同串行方式相比,并行方式可以提高执行效率,减少执行时间。

img

上面的比较可以发现,假设三个操作均需要50ms的执行时间,排除网络因素,则最终执行完成,串行方式需要150ms,而并行方式需要100ms。

因为cpu在单位时间内处理的请求数量是一致的,假设:CPU每1秒吞吐量是100此,则串行方式1秒内可执行的请求量为1000/150,不到7次;并行方式1秒内可执行的请求量为1000/100,为10次。

由上可以看出,传统串行和并行的方式会受到系统性能的局限,那么如何解决这个问题?

我们需要引入消息队列,将不是必须的业务逻辑,异步进行处理,由此改造出来的流程为:

img

根据上述的流程,用户的响应时间基本相当于将用户数据写入数据库的时间,发送注册邮件、发送注册短信的消息在写入消息队列后,即可返回执行结果,写入消息队列的时间很快,几乎可以忽略,也有此可以将系统吞吐量提升至20QPS,比串行方式提升近3倍,比并行方式提升2倍。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。

传统的做法为:订单系统调用库存系统的接口。如下图所示:

img

传统方式具有如下缺点:

  1. 假设库存系统访问失败,则订单减少库存失败,导致订单创建失败。
  2. 订单系统同库存系统过度耦合。

如何解决上述的缺点呢?需要引入消息队列,引入消息队列后的架构如下图所示:

img

  • 订单系统:用户下单后,订单系统进行数据持久化处理,然后将消息写入消息队列,返回订单创建成功。
  • 库存系统:使用拉/推的方式,获取下单信息,库存系统根据订单信息,进行库存操作。

假如在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其后续操作了。由此实现了订单系统与库存系统的应用解耦。

流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

  • 可以控制参与活动的人数;
  • 可以缓解短时间内高流量对应用的巨大压力;

流量削锋处理方式系统图如下:

img

  • 服务器在接收到用户请求后,首先写入消息队列。这时如果消息队列中消息数量超过最大数量,则直接拒绝用户请求或返回跳转到错误页面;
  • 秒杀业务根据秒杀规则读取消息队列中的请求信息,进行后续处理。

日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:

img

  • 日志采集客户端:负责日志数据采集,定时写受写入Kafka队列;
  • Kafka消息队列:负责日志数据的接收,存储和转发;
  • 日志处理应用:订阅并消费kafka队列中的日志数据;

这种架构在实际开发中的应用,可以参照案例:新浪技术分享:我们如何扛下32亿条实时日志的分析处理

img

服务的技术架构设计:

  • Kafka:接收用户日志的消息队列。
  • Logstash:做日志解析,统一成JSON输出给Elasticsearch。
  • Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
  • Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列、聊天室等。

点对点通讯

img
在点对点通讯架构设计中,客户端A和客户端B共用一个消息队列,即可实现消息通讯功能。

聊天室通讯

img

客户端A、客户端B、直至客户端N订阅同一消息队列,进行消息的发布与接收,即可实现聊天通讯方案架构设计。

消息中间件示例

电商系统

img

​ 电商系统架构示意图

消息队列采用高可用、可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket MQ。

  • 应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
  • 扩展流程(发短信、配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
  • 消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

日志收集系统

img

​ 日志收集系统架构示意图
分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。

  • Zookeeper注册中心,提出负载均衡和地址查找服务;
  • 日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列;
  • Kafka集群:接收,路由,存储,转发等消息处理;
  • Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据;

JMS消息服务

讲消息队列就不得不提JMS 。JMS(Java Message Service,Java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

消息模型

在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

P2P模式

img

​ P2P模式
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。

Pub/Sub模式

img

​ Pub/Sub模式
包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

  1. 同步
    订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
  2. 异步
    订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

JNDI:Java命名和目录接口,是一种标准的Java命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

JNDI在JMS中起到查找和访问发送目标或消息来源的作用。

JMS编程模型

1. ConnectionFactory

创建Connection对象的工厂,针对两种不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

2. Destination

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

3. Connection

Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP Socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

4. Session

Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

5. 消息的生产者

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

6. 消息消费者

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

7. MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

深入学习JMS对掌握JAVA架构、EJB架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。

参考

  1. 消息队列技术介绍