RocketMQ基础入门

1. RocketMQ 介绍

RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

​ 前身是MetaQ,是阿里研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

​ RocketMQ 是阿里巴巴集团基于高可用分布式集群技术, 自主研发的云正式商用的专业消息中间件, 既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性, 是阿里巴巴双 11 使用的核心产品。

​ RocketMQ 原先阿里巴巴内部使用, 与 2017 年提交到 Apache 基金会成为 Apache 基金会的顶级开源项目, GitHub 代码库链接:https://github.com/apache/rocketmq.git

1.1 RocketMQ 特点

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而kafka无法保证)
  • 支持拉(pull)和推(push)两种消息模式 (Push好理解,比如在消费者端设置Listener回调;而Pull,控制权在于应用,即应用需要主动的调用拉消息方法从Broker获取消息,这里面存在一个消费位置记录的问题(如果不记录,会导致消息重复消费))
  • 单一队列百万消息的堆积能力 (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟)
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义(RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性)
  • 提供 docker 镜像用于隔离测试和云集群部署
  • 提供配置、指标和监控等功能丰富的 Dashboard

1.2 RocketMQ 优势

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:

  • 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
  • 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
  • 支持 18 个级别的延迟消息(Kafka 不支持)
  • 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
  • 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
  • 支持重复消费(RabbitMQ 不支持,Kafka 支持)

2. RocketMQ 基本概念

RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

image-20230922160828309

2.1 NameServer

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

​ NameServer 是整个 RocketMQ 的“大脑” ,它是 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。

2.1.1 NameServer作用

​ 名称服务器(NameServer)用来保存 Broker 相关元信息并给 Producer 和 Consumer 查找 Broker 信息。NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。

​ 每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多,据说 RocketMQ 的早期版本确实是使用的 ZooKeeper ,后来改为了自己实现NameServer 。

2.1.2 和zk的区别

​ Name Server和ZooKeeper的作用大致是相同的,从宏观上来看,Name Server做的东西很少,就是保存一些运行数据,Name Server之间不互连,这就需要broker端连接所有的Name Server,运行数据的改动要发送到每一个Name Server来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了Name Server很轻量级,但是broker端就要做更多的东西了。

​ 而ZooKeeper呢,broker只需要连接其中的一台机器,运行数据分发、一致性都交给了ZooKeeper来完成。

2.1.3 高可用保障

​ Broker 在启动时向所有 NameServer 注册(主要是服务器地址等) ,生产者在发送消息之前先从 NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

​ NameServer 与每台 Broker 服务保持长连接,并间隔 10S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除,这样就可以实现 RocketMQ 的高可用。

2.2 Broker

​ 消息服务器(Broker)是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息,它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等,从部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型,Master 既可以写又可以读,Slave 不可以写只可以读。

2.2.1 部署方式

​ Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

​ 从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多 Slave(同步刷盘)、多 Master多 Slave(异步刷盘)。

2.2.1.1 单 Master

​ 这种方式一旦 Broker 重启或宕机会导致整个服务不可用,这种方式风险较大,所以显然不建议线上环境使用。

2.2.1.2 多 Master

​ 所有消息服务器都是 Master ,没有 Slave ,这种方式优点是配置简单,单个 Master 宕机或重启维护对应用无影响,缺点是单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受影响。

2.2.1.3 多 Master 多 Slave(异步复制)

​ 每个 Master 配置一个 Slave,所以有多对 Master-Slave,消息采用异步复制方式,主备之间有毫秒级消息延迟,这种方式优点是消息丢失的非常少,且消息实时性不会受影响,Master 宕机后消费者可以继续从 Slave 消费,中间的过程对用户应用程序透明,不需要人工干预,性能同多 Master 方式几乎一样。缺点是 Master 宕机时在磁盘损坏情况下会丢失极少量消息。

2.2.1.4 多 Master 多 Slave(同步复制)

​ 每个 Master 配置一个 Slave,所以有多对 Master-Slave ,消息采用同步复制方式,主备都写成功才返回成功,这种方式优点是数据与服务都没有单点问题,Master 宕机时消息无延迟,服务与数据的可用性非常高,缺点是性能相对异步复制方式略低,发送消息的延迟会略高。

2.2.2 高可用保障

​ 每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server,Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

2.3 生产者(Producer)

也称为消息发布者,负责生产并发送消息至 Topic

​ 生产者向brokers发送由业务应用程序系统生成的消息,RocketMQ提供了发送:同步、异步和单向(one-way)的多种模式

2.3.1 同步发送

​ 同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包,一般用于重要通知消息,例如重要通知邮件、营销短信。

2.3.2 异步发送

​ 异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。

2.3.3 单向发送

​ 单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

2.3.4 生产者组

​ 生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起,从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群。

2.3.5 高可用保障

​ Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳,Producer完全无状态,可集群部署。

​ Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

​ Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

2.4 消费者(Consumer)

也称为消息订阅者,负责从 Topic 接收并消费消息

消费者从brokers那里拉取信息并将其输入应用程序,在用户应用的角度,提供了两种类型的消费者:

  • Pull:Pull型消费者主动地从brokers那里拉取信息,只要批量拉取到消息,用户应用程序就会启动消费过程
  • Push:Push型消费者封装消息的拉取、消费进度和维护内部的其他工作,将一个在消息到达时执行的回调接口留给终端用户来实现。
2.4.1 消费者组

​ 消费者组(Consumer Group)一类 Consumer 的集合名称,这类 Consumer 通常消费同一类消息并且消费逻辑一致,所以将这些 Consumer 分组在一起,消费者组与生产者组类似,都是将相同角色的分组在一起并命名,分组是个很精妙的概念设计,RocketMQ 正是通过这种分组机制,实现了天然的消息负载均衡。

​ 消费消息时通过 Consumer Group 实现了将消息分发到多个消费者服务器实例,比如某个 Topic 有9条消息,其中一个 Consumer Group 有3个实例(3个进程或3台机器),那么每个实例将均摊3条消息,这也意味着我们可以很方便的通过加机器来实现水平扩展。

2.4.2 高可用保障

​ Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳,Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

​ Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

​ Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

​ 当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失,但是一旦master恢复,未同步过去的消息会被最终消费掉。

​ 消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk),任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量),新挂载的消费者对列中拥有commitlog中的所有数据。

2.5 运转流程

上面介绍了RocketMQ的各个角色及其作用, 下面我们看一下各角色之间完整的交互过程。

image-20230922160838018

  1. NameServer 先启动
  2. Broker 启动时向 NameServer 注册
  3. 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker 进行消息发送。
  4. NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到 Broker 宕机(使用心跳机制, 如果检测超过120S),则从路由注册表中将其移除。
  5. 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定

2.6 名词解释

2.6.1 消息

​ 消息(Message)就是要传输的信息,一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址,一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 key 并在 Broker 上查找此消息以便在开发期间查找问题。

2.6.2 主题

主题(Topic)可以看做消息的规类,它是消息的第一级类型

​ 比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic ,Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息,一个 Topic 也可以被 0个、1个、多个消费者订阅。

2.6.3 标签

标签(Tag)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性

​ 使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识,比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag ,标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

2.6.4 消息队列

消息队列(Message Queue),主题被划分为一个或多个子主题,即消息队列。

​ 一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去,下图 Broker 内部消息情况:

image-20230922160939851

2.6.5 消息消费模式

消息消费模式有两种:集群消费(Clustering)和广播消费(Broadcasting)

​ 默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费

​ 而广播消费消息会发给消费者组中的每一个消费者进行消费。

2.6.6 消息顺序

消息顺序(Message Order)有两种:顺序消费(Orderly)和并行消费(Concurrently)

​ 顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列,并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

3. 设计理念

RocketMQ 的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一,主要体现在以下三个方面:

3.1 NameServer设计及其简单

​ RocketMQ摒弃了业界常用的zookeeper作为注册中心,而是使用自研的NameServer来实现元数据的管理,因为Topic路由信息无须在集群间保持强一致性,追求最终一致性,并且能容忍分钟级的不一致,所以RocketMQ的NameServer集群间互不通信,极大降低了设计的复杂度,降低了对网络的要求,提升性能。

3.2 高效的IO存储机制

​ RocketMQ追求消息发送的高吞吐量,RocketMQ消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制。

​ 所有主题的消息存储基于顺序写,提升写性能,同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。

3.3 容忍存在的设计缺陷

适当将某些工作下放给RocketMQ使用者,消息中间件的实现者经常会遇到一个难题:

​ 如何保证消息一定能被消息消费者消费,并且保证只消费一次,RocketMQ的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息被消费者消费,但设计上允许消息被重复消费,这样极大地简化了消息中间件的内核,使得实现消息发送高可用变得非常简单与高效,消息重复问题由消费者在消息消费时实现幂等。

4. RocketMQ 架构

image-20230922161129212

RocketMQ 架构图中展示了四个集群:

4.1 NameServer 集群

​ 提供轻量级的服务发现及路由,每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。有些其它开源中间件使用 ZooKeeper 实现服务发现及路由功能,如 Apache Kafka。

NameServer是一个功能齐全的服务器,主要包含两个功能:

  1. Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活
  2. 路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息

4.2 Broker 集群

通过提供轻量级的 Topic 和Queue 机制处理消息存储。

​ 同时支持推(Push)和拉(Pull)两种模型,包含容错机制。提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。此外还提供灾难恢复,丰富的指标统计数据和警报机制,这些都是传统的消息系统缺乏的。

Broker 有几个重要的子模块:

  1. 远程处理模块,Broker 入口,处理来自客户端的请求
  2. 客户端管理,管理客户端(包括消息生产者和消费者),维护消费者的主题订阅
  3. 存储服务,提供在物理硬盘上存储和查询消息的简单 API
  4. HA 服务,提供主从 Broker 间数据同步
  5. 索引服务,通过指定键为消息建立索引并提供快速消息查询

4.3 Producer 集群

​ 消息生产者支持分布式部署,分布式生产者通过多种负载均衡模式向 Broker 集群发送消息。

4.4 Consumer 集群

​ 消息消费者也支持 Push 和 Pull 模型的分布式部署,还支持集群消费和消息广播。提供了实时的消息订阅机制,可以满足大多数消费者的需求。

有关架构图中集群间交互方式的说明:

  1. Broker Master 和 Broker Slave 是主从结构,会执行数据同步 Data Sync
  2. 每个 Broker 与 NameServer 集群中所有节点建立长连接,定时注册 Topic 信息到所有 NameServer
  3. Producer 与 NameServer 集群中的其中一个节点(随机)建立长连接,定期从 NameServer 获取 Topic 路由信息,并与提供 Topic 服务的 Broker Master 建立长连接,定时向 Broker 发送心跳
  4. Producer 只能将消息发送到 Broker Master,但是 Consumer 同时和提供 Topic 服务的 Master 和 Slave 建立长连接,既可以从 Master 订阅消息,也可以从 Slave 订阅消息。

5. 设计目标

5.1 架构模式

​ RocketMQ与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括:消息发送者、消息服务器(消息存储)、消息消费、路由发现。

5.1.1 顺序消息

顺序消息(FIFO:First Input First Output)是一种严格按照顺序进行发布和消费的消息类型,要求消息的发布和消息消费都按照顺序进行,RocketMQ可以严格保证消息有序

​ RocketMQ可以严格的保证消息有序,但这个顺序不是全局顺序,只是分区(queue)顺序,要全局顺序只能一个分区,但是同一条queue里面,RocketMQ的确是能保证FIFO的。

5.1.2 消息过滤

消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息RocketMQ消息过滤支持在服务端与消费端的消息过滤机制。

  1. 消息在Broker端过滤,Broker只将消息消费者感兴趣的消息发送给消息消费者。
  2. 消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从Broker传输到消费端。
5.1.3 消息存储

消息中间件的一个核心实现是消息的存储,对消息存储一般有如下两个维度的考量:

​ 消息堆积能力和消息存储性能,RocketMQ追求消息存储的高性能,引入内存映射机制,所有主题的消息顺序存储在同一个文件中,同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。

5.2 消息高可用性

通常影响消息可靠性的有以下几种情况

  1. Broker正常关机。
  2. Broker异常宕机。
  3. 操作系统宕机。
  4. 机器断电,但是能立即恢复供电情况。
  5. 机器无法开机(可能是CPU、主板、内存等关键设备损坏)。
  6. 磁盘设备损坏。

​ 针对上述情况,情况14的RocketMQ在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息,情况56属于单点故障,一旦发生,该节点上的消息全部丢失,如果开启了异步复制机制,RoketMQ能保证只丢失少量消息,RocketMQ在后续版本中将引入双写机制,以满足消息可靠性要求极高的场合。

5.2.1 消息到消费低延迟

​ RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。

5.2.2 确保消息必须被消费一次

​ RocketMQ通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于ACK消息有可能丢失等其他原因,RocketMQ无法做到消息只被消费一次,有重复消费的可能。

5.2.3 回溯消息

​ 回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息,RocketMQ支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。

5.2.4 消息堆积

​ 消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力,RocketMQ消息存储使用磁盘文件(内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用,RocketMQ消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天

5.2.5 定时消息

​ 定时消息是指消息发送到Broker后,不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故RocketMQ不支持任意进度的定时消息,而只支持特定延迟级别。(一个说法是任意精度的定时消息会带来性能损耗,但是阿里云版本的 RocketMQ却提供这样的功能,充值收费优先策略? )

5.2.6 消息重试机制

​ 消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制。

6. RocketMQ安装

参考: https://baiyp.ren/RocketMQ%E5%AE%89%E8%A3%85.html

7. 工程实例

7.1 Java 访问 RocketMQ 实例

RocketMQ 目前支持 Java、C++、Go 三种语言访问,按惯例以 Java 语言为例看下如何用 RocketMQ 来收发消息的。

7.2 引入依赖

添加 RocketMQ 客户端访问支持,具体版本和安装的 RocketMQ 版本一致即可。

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>

7.3 消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Producer {

public static void main(String[] args) throws Exception {
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");

//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");

//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();

for (int i = 0; i < 100; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
"topic_rocket_example" /* 消息主题名 */,
"TagA" /* 消息标签 */,
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);

//发送消息并返回结果
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
}

// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}

​ 示例中用 DefaultMQProducer 类来创建一个消息生产者,通常一个应用创建一个 DefaultMQProducer 对象,所以一般由应用来维护生产者对象,可以其设置为全局对象或者单例。该类构造函数入参 producerGroup 是消息生产者组的名字,无论生产者还是消费者都必须给出 GroupName ,并保证该名字的唯一性,ProducerGroup 发送普通的消息时作用不大。

​ 接下来指定 NameServer 地址和调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。

​ 初始化完成后,调用 send 方法发送消息,示例中只是简单的构造了100条同样的消息发送,其实一个 Producer 对象可以发送多个主题多个标签的消息,消息对象的标签可以为空。send 方法是同步调用,只要不抛异常就标识成功。

​ 最后应用退出时调用 shutdown 方法清理资源、关闭网络连接,从服务器上注销自己,通常建议应用在 JBOSS、Tomcat 等容器的退出钩子里调用 shutdown 方法。

7.4 消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Consumer {

public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topic_rocket_example", "*");

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}

​ 示例中用 DefaultMQPushConsumer 类来创建一个消息消费者,通生产者一样一个应用一般创建一个 DefaultMQPushConsumer 对象,该对象一般由应用来维护,可以其设置为全局对象或者单例。该类构造函数入参 consumerGroup 是消息消费者组的名字,需要保证该名字的唯一性。

​ 接下来指定 NameServer 地址和设置消费者应用程序第一次启动时从队列头部开始消费还是队列尾部开始消费。

​ 接着调用 subscribe 方法给消费者对象订阅指定主题下的消息,该方法第一个参数是主题名,第二个擦书是标签名,示例表示订阅了主题名 topic_example_java 下所有标签的消息。

​ 最主要的是注册消息监听器才能消费消息,示例中用的是 Consumer Push 的方式,即设置监听器回调的方式消费消息,默认监听回调方法中 List 里只有一条消息,可以通过设置参数来批量接收消息。

​ 最后调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。

7.5 启动 Name Server

1
2
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

​ RocketMQ 核心的四大组件中 Name Server 和 Broker 都是由 RocketMQ 安装包提供的,所以要启动这两个应用才能提供消息服务。首先启动 Name Server,先确保你的机器中已经安装了与 RocketMQ 相匹配的 JDK ,并设置了环境变量 JAVA_HOME ,然后在 RocketMQ 的安装目录下执行 bin 目录下的 mqnamesrv ,默认会将该命令的执行情况输出到当前目录的 nohup.out 文件,最后跟踪日志文件查看 Name Server 的实际运行情况。

7.6 启动 Broker

1
2
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

​ 同样也要确保你的机器中已经安装了与 RocketMQ 相匹配的 JDK ,并设置了环境变量 JAVA_HOME ,然后在 RocketMQ 的安装目录下执行 bin 目录下的 mqbroker ,默认会将该命令的执行情况输出到当前目录的 nohup.out 文件,最后跟踪日志文件查看 Broker 的实际运行情况。

RocketMQ深入分析

1. 消息存储

​ 目前的MQ中间件从存储模型来,分为需要持久化和不需要持久化的两种模型,现在大多数的是支持持久化存储的,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ。ZeroMQ却不需要支持持久化存储而业务系统也大多需要MQ有持久存储的能力,这样可以大大增加系统的高可用性。

​ 从存储方式和效率来看,文件系统高于KV存储,KV存储又高于关系型数据库,直接操作文件系统肯定是最快的,但如果从可靠性的角度出发直接操作文件系统是最低的,而关系型数据库的可靠性是最高的。

1.1 存储介质类型和对比

常用的存储类型分为关系型数据库存储、分布式KV存储 和 文件系统存储

目前的高性能磁盘,顺序写速度可以达到 600MB/s,足以满足一般网卡的传输速度,而磁盘随机读写的速度只有约 100KB/s,与顺序写的性能相差了 6000 倍。故好的消息队列系统都会采用顺序写的方式

关系型数据库存储 分布式KV存储 文件系统存储
简介 选用 JDBC 方式实现消息持久化,只需要简单地配置 xml 即可实现 JDBC 消息存储 kv存储即 Key-Value 型存储中间件,如 Redis 和 RocksDB,将消息存储到这些中间件中 将消息存储到文件系统中
性能 存在性能瓶颈,如mysql在单表数据量达到千万级别的情况下,IO读写性能下降 通过高并发的中间件存储和处理消息,速度必然优于数据库存储方式 将消息刷盘至所部属虚拟化/物理机的文件系统来实现消息持久化,效率更高
可靠性 该方案十分依赖DB,一旦DB出现故障,MQ消息无法落盘存储,从而导致线上故障 相较DB来说更加安全可靠 除非部署 MQ 的机器本身或是本地磁盘挂了,否则一般不会出现无法持久化的问题
项目使用 ActiveMQ Redis、RockDB RocketMQ、Kafaka、RabbitMQ

存储效率:文件系统 > 分布式KV存储 > 关系型数据库DB

开发难度和集成:关系型数据库DB < 分布式KV存储 < 文件系统

1.2 顺序读写和随机读写

顺序读写和随机读写对于机械硬盘来说为什么性能差异巨大

顺序读写 随机读写
文件数目 读取一个大文件 读取多个小文件
比较 顺序读写只读取一个大文件,耗时更少 随机读写需要打开多个文件,写进行多次的寻址和旋转延迟,速率远低于顺序读写
文件预读 顺序读写时磁盘会预读文件,即在读取的起始地址连续读取多个页面,若被预读的页面被使用,则无需再去读取 由于数据不在一起,无法预读
比较 在大并发的情况下,磁盘预读能够免去大量的读操作,处理速度肯定更快 磁盘需要不断的寻址,效率很低
写入数据 写入新文件时,需要寻找磁盘可用空间 写入新文件时,需要寻找磁盘可用空间。但由于一个文件的存储量更小,这个操作触发频率更多
比较 顺序读写创建新文件,只需要创建一个大文件就可以用很久 随机读写可能频繁创建文件。创建文件时需要进行寻找磁盘可用空间等一些列操作,肯定更加耗时

1.3 消息存储机制

由于消息队列有高可靠性的要求,故要对队列中的数据进行持久化存储。

  1. 消息生产者先向 MQ 发送消息
  2. MQ 收到消息,将消息进行持久化,并在存储系统中新增一条记录
  3. 返回ACK(确认字符)给生产者
  4. MQ 推送消息给对应的消费者,等待消费者返回ACK(确认字符,确认消费)
  5. 若这条消息的消费者在等待时间内成功返回ACK,则 MQ 认为消息消费成功,删除存储中的消息
  6. 若 MQ 在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新推送消息

image-20230922161742823

1.4 消息存储设计

​ RocketMQ采用了单一的日志文件,即把同1台机器上面所有topic的所有queue的消息,存放在一个文件里面,从而避免了随机的磁盘写入。

image-20230922161759111

​ 所有消息都存在一个单一的CommitLog文件里面,然后有后台线程异步的同步到ConsumeQueue,再由Consumer进行消费。

​ 这里之所以可以用“异步线程”,也是因为消息队列天生就是用来“缓冲消息”的。只要消息到了CommitLog,发送的消息也就不会丢。只要消息不丢,那就有了“充足的回旋余地”,用一个后台线程慢慢同步到ConsumeQueue,再由Consumer消费。

1.5 消息存储结构

​ 消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。RocketMQ 采取一些机制,尽量向 CommitLog 中顺序写,但是随机读。单个文件大小默认1G ,可通过在 broker 置文件中设置 mapedFileSizeCommitLog 属性来改变默认大小。

1.5.1 CommitLog

CommitLog是存储消息内容的存储主体,Producer发送的消息都会顺序写入CommitLog文件

​ CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:$ {user.home} \store$ { commitlog} \ $ { fileName}

​ 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

image-20230922161650160

1.5.2 ConsumeQueue

consumequeue文件可以看成是基于topic的commitlog索引文件。

​ RocketMQ基于主题订阅模式实现消息的消费,消费者关心的是主题下的所有消息。但是由于不同的主题的消息不连续的存储在commitlog文件中,如果只是检索该消息文件可想而知会有多慢,为了提高效率,对应的主题的队列建立了索引文件,为了加快消息的检索和节省磁盘空间,每一个consumequeue条目存储了消息的关键信息commitog文件中的偏移量、消息长度、tag的hashcode值。

image-20230922161428232

1.5.3 IndexFile

​ index 存的是索引文件,用于为生成的索引文件提供访问服务,这个文件用来加快消息查询的速度,通过消息Key值查询消息真正的实体内容。消息消费队列 RocketMQ 专门为消息订阅构建的索引文件 ,提高根据主题与消息检索 消息的速度 ,使用 Hash 索引机制,具体是 Hash 槽与 Hash 冲突的链表结构。

​ 在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引

image-20230922161432649

1.5.4 Config

config 文件夹中 存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在在此。

  • topics.json : topic 配置属性
  • subscriptionGroup.json :消息消费组配置信息。
  • delayOffset.json :延时消息队列拉取进度。
  • consumerOffset.json :集群消费模式消息消进度。consumerFilter.json :主题消息过滤信息。

2. 消费进度管理

​ 业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的

​ 如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

​ 为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消费失败的消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

2.1 从哪里开始消费

​ 当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度,如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

1
2
3
CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

2.2 消息ACK机制

​ RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

image-20230922161148462

2.3 重复消费问题

​ 这定时方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

1
![image-20201225112140154](./../../../../../工作台/笔记/04_数据存储/消息中间件/RocketMQ/RocketMQ/image-20201225112140154.png)

​ 在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

​ 在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

​ 对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

2.4 重复消费验证

2.4.1 查看当前消费进度

检查队列消费的当前进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
cat consumerOffset.json
{
"offsetTable": {
"topicTest@rocket_test_consumer_group": {
0: 33,
1: 32,
2: 32,
3: 33
},
"%RETRY%rocket_test_consumer_group@rocket_test_consumer_group": {
0: 6
}
}
}

通过consumerOffset.json我们可以知道当前topicTest主题的queue0消费到偏移量为28

2.4.2 消费者发送消息

消费者发送消息,并查看各个队列消息的偏移量

1
2
3
4
5
6
7
8
9
10
发送queueId:[2],偏移量offset:[32],发送状态:[SEND_OK]
发送queueId:[3],偏移量offset:[33],发送状态:[SEND_OK]
发送queueId:[0],偏移量offset:[33],发送状态:[SEND_OK]
发送queueId:[1],偏移量offset:[32],发送状态:[SEND_OK]
发送queueId:[2],偏移量offset:[33],发送状态:[SEND_OK]
发送queueId:[3],偏移量offset:[34],发送状态:[SEND_OK]
发送queueId:[0],偏移量offset:[34],发送状态:[SEND_OK]
发送queueId:[1],偏移量offset:[33],发送状态:[SEND_OK]
发送queueId:[2],偏移量offset:[34],发送状态:[SEND_OK]
发送queueId:[3],偏移量offset:[35],发送状态:[SEND_OK]

我们发现队列2的偏移量最小为29

消费的时候最小偏移量不提交,其他都正常

1
2
3
4
5
6
7
8
9
10
//队列2的偏移量为29的数据在等待
if (ext.getQueueId() == 2 && ext.getQueueOffset() == 29) {
System.out.println("消息消费耗时较厂接收queueId:[" + ext.getQueueId() + "],偏移量offset:[" + ext.getQueueOffset() + "]");
//等待 模拟假死状态
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

运行查看日志

1
2
3
4
5
6
7
8
9
10
接收queueId:[3],偏移量offset:[34],接收时间:[1608880793658],消息=[Hello Java demo RocketMQ 5]
接收queueId:[3],偏移量offset:[33],接收时间:[1608880793658],消息=[Hello Java demo RocketMQ 1]
接收queueId:[3],偏移量offset:[35],接收时间:[1608880793658],消息=[Hello Java demo RocketMQ 9]
接收queueId:[1],偏移量offset:[32],接收时间:[1608880794684],消息=[Hello Java demo RocketMQ 3]
接收queueId:[0],偏移量offset:[34],接收时间:[1608880794687],消息=[Hello Java demo RocketMQ 6]
接收queueId:[1],偏移量offset:[33],接收时间:[1608880794685],消息=[Hello Java demo RocketMQ 7]
接收queueId:[2],偏移量offset:[33],接收时间:[1608880794689],消息=[Hello Java demo RocketMQ 4]
接收queueId:[0],偏移量offset:[33],接收时间:[1608880794685],消息=[Hello Java demo RocketMQ 2]
模拟服务宕机无法确认消息,接收queueId:[2],偏移量offset:[32]
接收queueId:[2],偏移量offset:[34],接收时间:[1608880794691],消息=[Hello Java demo RocketMQ 8]

我们发现只有队列2的偏移量为29的消息消费超时,其他都已经正常消费

我们再查看下consumerOffset.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
cat consumerOffset.jso
{
"offsetTable": {
"topicTest@rocket_test_consumer_group": {
0: 33,
1: 32,
2: 32,
3: 33
},
"%RETRY%rocket_test_consumer_group@rocket_test_consumer_group": {
0: 6
}
}
}

我们发现rocketMQ 整个消费记录都没有被提交,所以下次消费会全部再次消费

2.4.3 再次消费

去掉延时代码继续消费

1
2
3
4
5
6
7
8
9
10
接收queueId:[3],偏移量offset:[35],接收时间:[1608880958530],消息=[Hello Java demo RocketMQ 9]
接收queueId:[3],偏移量offset:[33],接收时间:[1608880958530],消息=[Hello Java demo RocketMQ 1]
接收queueId:[3],偏移量offset:[34],接收时间:[1608880958530],消息=[Hello Java demo RocketMQ 5]
接收queueId:[2],偏移量offset:[32],接收时间:[1608880959539],消息=[Hello Java demo RocketMQ 0]
接收queueId:[2],偏移量offset:[33],接收时间:[1608880959560],消息=[Hello Java demo RocketMQ 4]
接收queueId:[0],偏移量offset:[33],接收时间:[1608880959561],消息=[Hello Java demo RocketMQ 2]
接收queueId:[2],偏移量offset:[34],接收时间:[1608880959561],消息=[Hello Java demo RocketMQ 8]
接收queueId:[1],偏移量offset:[32],接收时间:[1608880959564],消息=[Hello Java demo RocketMQ 3]
接收queueId:[1],偏移量offset:[33],接收时间:[1608880959564],消息=[Hello Java demo RocketMQ 7]
接收queueId:[0],偏移量offset:[34],接收时间:[1608880959566],消息=[Hello Java demo RocketMQ 6]

我们发现消息被重复消费了一遍

3. 文件刷盘机制

RocketMQ 的消息是存储在磁盘上的,这样做有两个优点:

  • 保证断电后恢复

  • 让存储的消息量超出内存的限制

​ RocketMQ 存储与读写是基于 JDK NIO 的内存映射机制,具体使用 MappedByteBuffer(基于 MappedByteBuffer 操作大文件的方式,其读写性能极高)RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息 超出内存的限制 RocketMQ 为了提高性能,会尽可能地保证 磁盘的顺序写 消息在通过 Producer 写人 RocketMQ 的时候,有两种写磁盘方式:

image-20230922161204008

3.1 同步刷盘方式

​ 如上图所示,只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。

3.2 异步刷盘

​ 能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程wakeup后,就会继续执行。

3.3 刷盘方式对比

同步刷盘 异步刷盘
消息情况 在返回写成功状态时,消息已经被写入磁盘中。即消息被写入内存的PAGECACHE 中后,立刻通知刷新线程刷盘,等待刷盘完成,才会唤醒等待的线程并返回成功状态 在返回写成功状态时,消息可能只是被写入内存的 PAGECACHE 中。当内存的消息量积累到一定程度时,触发写操作快速写入
性能 需要等待刷盘才能返回结果 消息写入内存后立刻返回结果,吞吐量更高
可靠性 可以保持MQ的消息状态和生产者/消费者的消息状态一致 Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

4. 过期文件删除

4.1 为什么删除过期文件

​ 由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。

4.2 删除文件的思路

​ RocketMQ顺序写CommitLog文件、ComsumeQueue文件,所有的写操作都会落到最后一个文件上,因此在当前写文件之前的文件将不会有数据插入,也就不会有任何变动,因此可通过时间来做判断,比如超过72小时未更新的文件将会被删除

注意:RocketMQ删除过期文件时不会关注该文件的内容是否全部被消费

4.3 如何防止重复投递

客户端通过broker的消费进度确定自己需要拉取那些消息

​ 消息的存储是一直存在于CommitLog中的。而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。所以消息被消费了,消息所占据的物理空间并不会立刻被回收。

但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?

​ 答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。

4.4 过期文件删除流程

删除过期文件的整体流程如下:

  • 开启定时任务每10s扫描是否有文件需要删除
  • 有三种情况会进入删除文件操作:到了deleteWhere指定的时间点(默认是凌晨4点)、磁盘不足、手动触发
  • 对于磁盘不足的情况,当磁盘使用率大于磁盘空间警戒线水位(默认是90%),会阻止消息写入,当超过85%时会强制删除文件(需要设置允许强制删除参数,否者不生效),其他两种情况都只能删除过期的文件(文件最后更新时间+文件最大的存活时间 < 当前时间)
  • 当被删除的文件存在引用时,会有一个文件删除缓存时间,在这段时间内,该文件不会被删除,主要是留给引用该文件程序一些时间,当超过了文件删除缓存时间后,每次都会将该文件的引用减少1000,直到减少小于等于0后才释放该文件引用的相关资源,然后将该文件放入一个“文件删除集合”中
  • 一次连续删除文件中间会存在一定的间隔,不会连续释放文件相关的资源
  • 一次连续删除的文件和不大于10
  • 将“文件删除集合”中的文件从

5. 高可用

5.1 NameServer 高可用

​ 由于 NameServer 节点是无状态的,且各个节点直接的数据是一致的,故存在多个 NameServer 节点的情况下,部分 NameServer 不可用也可以保证 MQ 服务正常运行

5.1.1 BrokerServer 高可用

RocketMQ是通过 Master 和 Slave 的配合达到 BrokerServer 模块的高可用性的,一个 Master 可以配置多个 Slave,同时也支持配置多个 Master-Slave 组。

当其中一个 Master 出现问题时:

  • 由于Slave只负责读,当 Master 不可用,它对应的 Slave 仍能保证消息被正常消费
  • 由于配置多组 Master-Slave 组,其他的 Master-Slave 组也会保证消息的正常发送和消费

5.2 消息消费高可用

​ Consumer 的高可用是依赖于 Master-Slave 配置的,由于 Master 能够支持读写消息,Slave 支持读消息,当 Master 不可用或繁忙时, Consumer 会被自动切换到从 Slave 读取(自动切换,无需配置)。故当 Master 的机器故障后,消息仍可从 Slave 中被消费

5.3 消息发送高可用

​ 在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

image-20230922161219989

5.4 消息主从复制

5.4.1 同步复制和异步复制

若一个 Broker 组有一个 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步复制和异步复制两种方式

同步复制 异步复制
概念 即等 Master 和 Slave 均写成功后才反馈给客户端写成功状态 只要 Master 写成功,就反馈客户端写成功状态
可靠性 可靠性高,若 Master 出现故障,Slave 上有全部的备份数据,容易恢复 若 Master 出现故障,可能存在一些数据还没来得及写入 Slave,可能会丢失
效率 由于是同步复制,会增加数据写入延迟,降低系统吞吐量 由于只要写入 Master 即可,故数据写入延迟较低,吞吐量较高
5.4.2 配置方式

可以对 broker 配置文件里的 brokerRole 参数进行设置,提供的值有:

  • ASYNC_MASTER:异步复制
  • SYNC_MASTER:同步复制
  • SLAVE:表明当前是从节点,无需配置 brokerRole
5.4.3 实际应用

在实际应用中,由于同步刷盘方式会频繁触发磁盘写操作,明显降低性能,故通常配置为:

  • 刷盘方式:ASYNC_FLUSH(异步刷盘)
  • 主从复制:SYNC_MASTER(同步复制)

异步刷盘能够避免频繁触发磁盘写操作,除非服务器宕机,否则不会造成消息丢失。

主从同步复制能够保证消息不丢失,即使 Master 节点异常,也能保证 Slave 节点存储所有消息并被正常消费掉。

6. 业务上保障

6.1 为什么从业务上保证

6.1.1 消息丢失问题

​ RocketMQ虽然号称消息不会丢失,但是还是有几率存在MQ宕机以及rocketMQ使用上的问题可能存在消息丢失等,对于类似于支付确认的消息一般来说是一条都不允许丢失的

6.1.2 消息幂等性

在网络环境中,由于网络不稳定等因素,消息队列的消息有可能出现重复,大概有以下几种:

6.1.2.1 发送时消息重复

​ 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

6.1.2.2 投递时消息重复

​ 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

6.1.2.3 负载均衡时消息重复

包括但不限于网络抖动、Broker 重启以及订阅方应用重启

​ 当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

​ 结合三种情况,可以发现消息重发的最后结果都是,消费者接收到了重复消息,那么,我们只需要在消费者端统一进行幂等处理就能够实现消息幂等。

6.2 整体架构

上面我们是从MQ本身来保证消息的的可靠性,下面我们从业务上来分析如何保障MQ的可靠性,Mq 的消息成功投递和消费是比较难的,这里提供一个思路,如何保证消息成功投递并且消息是幂等的。

image-20230922161227168

6.2.1 表结构设计

这里面涉及到两张表

6.2.1.1 消息存根

消息发送到 mq 之前先把消息存储到 mongodb 或者 mysql 中,消息有一个存根是为了避免消息在 mq 中丢失后就找不到发送的消息,主要需要保存几个关键信息:

  • messageId: 消息流水号,用来在消息的生产端和消费端串联使用,根据这个id找到消费端和生产端唯一的消息
  • messageContent:消息内容
  • count:发送到 broker 的次数,如果超过特定次数就不往 broker 发
  • status:消息状态,已确认、未确认、已作废,如果消费端已经确认就需要修改该状态
  • offset:偏移量,发送的时候记录消息的偏移量。
  • resendTime:重发时间,需要加上消息可能在队列的堆积时间,否则可能造成消息还未被消费到就被重发了
  • createDatre:记录消息的发送时间
6.2.1.2 消息日志

为了保障消息能够成功发送到MQ,需要在拉取到消息后的第一时间将消息ID保存到消息日志表,用来让发送端知道消息消息是否成功发送到了MQ,需要包含一下字段:

  • messageId: 消息流水号,为了和消息存根来保持一致
  • offset:用来记录当前接收到的消息的偏移量,重发时需要通过统计最大偏移量来确定消费者是否堆积来确定是否重发
  • createDate:用来记录当前消息接收到的时间

6.3 幂等性校验

6.3.1 Redis幂等性校验

首先进行Redis的setNX进行幂等性校验,有以下情况

  • 成功:代表redis幂等性校验通过,为了防止Redis数据误删或者过期,需要进行数据库的幂等性校验,检查消息存根是否是未处理状态,如果是未处理状态则进行业务,则说明消息是未处理状态,否则幂等性校验失败
  • 失败:代表redis幂等性校验未通过,则不通过数据库直接进行校验失败处理
6.3.2 DB幂等性校验

如果Redis校验没有通过,还需要DB进行幂等性校验,有以下情况

  • 成功:说明消息确定时未处理的,需要进行处理
  • 失败:说明Redis缓存已经过期或者误删,这里做兜底处理
6.3.3 校验失败处理

幂等性校验失败则说明消息是重复,存在一下两种情况

6.3.3.1 Redis校验失败

如果直接Redis的setNX校验失败,说明是重复消息,但是这个时候消息是不知道消息是否处理完成,有以下两种情况:

  • 消息处理完成:需要删除消息日志以及返回成功消费的标志
  • 消息未处理完成:这个时候说明已经有一个线程在处理消息了,直接结束并返回成功消费的标志。

正确处理逻辑如下

​ 这个时候的处理业务应该是Redis校验失败后,但是并不能确定消息是否真的已经处理完成还是处理中的消息,需要先检查消息存根状态:

  • 如果消息处理成功需要删除消息日志,返回成功标志。
  • 如果没有处理成功,则说明另一个线程正在处理,直接返回成功标志。
6.3.3.2 DB校验失败

​ 这个时候说明消息已经处理完成了,redis并且已经通过setNX已经设置标志了,需要删除消息日志,并且返回成功标志。

6.3.4 校验成功

​ 幂等性校验成功后就需要处理业务操作了

6.4 业务处理

业务操作分为两种情况:

6.4.1 操作成功

​ 如果操作成功,需要修改消息存根状态,并且删除消息日志,然后返回成功标志

6.4.2 操作失败(异常)

​ 如果操作失败,需要消费端重试,这个时候删除redis的setNX的值,并且返回重试指令,让消费端进行重试,但是重试可能一直不成功,RocketMQ的消费端重试机制,达到上限后会投递到死信队列,后期需要人工处理。

6.5 定时重发消息

6.5.1 重发消息筛选

需要符合一下条件的数据才会被筛选出来

  • 消息存根中未确认的消息
  • 消息存根中发送次数小于最大发送次数的消息
  • 消息存根中下次发送时间大于当前时间的消息
  • 消息存根中的偏移量小于消息日志中的最大偏移量的消息
  • 消息存根中的消息ID在消息日志中不存在的消息
6.5.2 重发消息

符合以上条件的消息需要进行重发,调用MQProducer客户端进行重发消息,重发完成后还需要做一下事情

  • 修改消息的重发次数,当前次数+1
  • 修改消息的下次重发时间,通过规则计算需要间隔多长时间才需要重发
  • 修改消息的偏移量,设置为当前发送消息的的偏移量
6.5.3 人工处理

​ 如果消息的发送次数达到最大的发送次数,将无法进行重发,需要人工进行处理,可以通过发邮件以及其他的方式通知开发人员进行后续的人工处理

RocketMQ高阶使用

1. 能力目标

  • 能够理解和使用RocketMQ的顺序消息
  • 能够理解和使用RocketMQ的生产者保证策略
  • 能够理解和使用RocketMQ的消息投递策略
  • 能够理解和使用RocketMQ的消费者的重试策略

2. 车辆调度

2.1 业务分析

2.1.1 车辆调度分析

image-20230922161234154

​ 用户打车从派单服务到调度服务,首先将消息以顺序方式扔到RocketMQ中,然后消费的事务就会严格按照放入的顺序进行消费,用户首先拿到从RocketMQ推送的顺序消息,然后保持住,开始轮询检查Redis中的List中是否存在车辆,存在两种情况:

2.1.1.1 没有拉取到车辆

​ 如果没有拉取到车辆,然后会延时一段时间,继续进行拉取,一直拉取不到的话一直进行自旋,一直等到拿到车辆才退出自旋。

2.1.1.2 拉取到车辆

​ 如果拉取车辆就会将用户和拿到的车辆绑定到一起,开始后续操作,比如下订单等。

2.1.2 司机自动接单

image-20230922161237746

当司机上线后,开启自动接单后,主题流程图下

  1. 会先将车辆状态设置为在Ready状态,
  2. 当车辆接到用户后会将车辆设置为Running状态,
  3. 用户下车后,会将车辆继续设置为Ready状态,并将车辆push进list
2.1.3 用户下车

image-20230922161241064

​ 如果用户点击下车,主体流程如下

  1. 会先将用户状态设置为Stop状态
  2. 然后会解除车辆和用户的绑定,
  3. 之后车辆会将会push到list的尾端,让其他的用户可以拉取到车辆信息。
2.1.4 用户打车

image-20230922161244709

用户上车后流程如下

  1. 校验用户状态,然后将发送顺序消息到RabbitMQ
  2. 消费者获取到用户消息,开始轮询来拉取车辆信息,如果拉取不到休眠一会继续拉取,一直到拉取到
  3. 拉取到后校验是否超时,如果超时直接结束打车,否则删除RabbitMQ的超时检测Key,失效超时通知
  4. 设置用户状态为Running,后续就到了司机自动接单的流程了

2.2 技术分析

2.2.1 RocketMQ顺序消息

打车需要排队,我们需要让前面的人能够被消费到,不能让这个顺序乱掉,这就需要用到RocketMQ的顺序消息

2.2.2 Redis 轮询队列

我们要让车辆在队列中,从MQ拿到一个车辆后,需要再从队列中拿取一个车辆如果拿不到则需要不断的轮询,一直到拿到车辆为止,如果打车玩完成还是需要将车辆归还队列,让其他的用户来打车,将一辆车重复利用起来

3. 顺序消息

3.1 顺序类型

3.1.1 无序消息

无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证。

  • Producer 依次发送 orderId 为 1、2、3 的消息
  • Consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。
3.1.2 全局顺序

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费

image-20230922161249505

比如 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。

3.1.3 局部顺序

在实际开发有些场景中,我并不需要消息完全按照完全按的先进先出,而是某些消息保证先进先出就可以了。

image-20230922161253641

​ 就好比一个打车涉及到不同地区 北京上海广州深圳。我不用管其它的订单,只保证同一个地区的订单ID能保证这个顺序就可以了。

3.2 Rocket顺序消息

RocketMQ可以严格的保证消息有序,但这个顺序,不是全局顺序,只是分区(queue)顺序,要全局顺序只能一个分区。

3.2.1 问题梳理

​ 我们知道 生产的message最终会存放在Queue中,如果一个Topic关联了4个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到4个queue,

​ 好比有10条消息,那么这10条消息会平均分配在这4个Queue上,那么每个Queue大概放2个左右,这里有一点很重的是:同一个queue,存储在里面的message 是按照先进先出的原则

image-20230922161257018

​ 之所以出现下面这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不同的queue(分区)

3.2.2 解决方案

​ 这个时候思路就来了,我们让不同的地区用不同的queue。只要保证同一个地区的订单把他们放到同一个Queue那就保证消费者先进先出了。

image-20230922161300764

这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么取消息的时候就可以保证先进先取出。

3.2.3 如何保证集群有序

这里还有很关键的一点,在一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 北京订单1,它拿完后,消费者2去queue拿到的是 北京订单2

拿的顺序是没毛病了,但关键是先拿到不代表先消费完它。会存在虽然你消费者1先拿到北京订单1,但由于网络等原因,消费者2比你真正的先消费消息,这是不是很尴尬了。

3.2.3.1 分布式锁

Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。

所以最终的消费者这边的逻辑就是

  • 消费者1去Queue拿 北京订单1,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。
  • 然后下一个消费者去拿到 北京订单2 同样锁住当前Queue,这样的一个过程来真正保证对同一个Queue能够真正意义上的顺序消费,而不仅仅是顺序取出。
3.2.4 消息类型对比

全局顺序与分区顺序对比

Topic消息类型 支持事务消息 支持定时/延时消息 性能
无序消息(普通、事务、定时/延时) 最高
分区顺序消息
全局顺序消息 一般

发送方式对比

Topic消息类型 支持可靠同步发送 支持可靠异步发送 支持Oneway发送
无序消息(普通、事务、定时/延时)
分区顺序消息
全局顺序消息
3.2.5 注意事项
  1. 顺序消息暂不支持广播模式。
  2. 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
  3. 建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。
  4. 对于全局顺序消息,建议创建broker个数 >=2。

3.3 代码示例

3.3.1 队列选择器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SelectorFactory {
/**
* 工厂模式获取MessageQueueSelector
*
* @param value
* @return
*/
public static MessageQueueSelector getMessageQueueSelector(String value) {
//如果value不为空使用hash选择器
if (StringUtils.isNotEmpty(value)) {
return new SelectMessageQueueByHash();
}
//如果value为空使用随机选择器
return new SelectMessageQueueByRandom();
}
}
3.3.2 消息发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
public class MQProducer {

@Autowired
DefaultMQProducer defaultMQProducer;

/**
* 同步发送消息
* @param taxiBO
*/
public void send(TaxiBO taxiBO) {
if (null == taxiBO) {
return;
}
SendResult sendResult = null;

try {
//获取消息对象
Message message = RocketMQHelper.buildMessage(DispatchConstant.SEQ_TOPIC, taxiBO);
//根据区域编码获取队列选择器
MessageQueueSelector selector = SelectorFactory.getMessageQueueSelector(taxiBO.getAreaCode());
//发送同步消息
sendResult = defaultMQProducer.send(message, selector, taxiBO.getAreaCode(), 10000);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (null != sendResult) {
System.out.println(sendResult.toString());
}
}
}
3.3.3 消息消费者

消费者真正要达到消费顺序,需要分布式锁,所以这里需要将MessageListenerOrderly替换之前的MessageListenerConcurrently,因为它里面实现了分布式锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 消费消息
*/
public abstract class MQConsumeMessageListenerProcessor implements MessageListenerOrderly {
public static final Logger logger = LoggerFactory.getLogger(MQConsumeMessageListenerProcessor.class);

/**
* 消费有序消息
*
* @param list
* @param consumeOrderlyContext
* @return
*/
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {

if (CollectionUtils.isEmpty(list)) {
logger.info("MQ接收消息为空,直接返回成功");
return ConsumeOrderlyStatus.SUCCESS;
}
//消费消息
for (MessageExt messageExt : list) {
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
//调用具体消费流程
processMessage(topic, tags, body);
logger.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) {
logger.error("获取MQ消息内容异常{}", e);
//暂停当前队列
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}

// TODO 处理业务逻辑
return ConsumeOrderlyStatus.SUCCESS;
}

/**
* 处理消息
*
* @param body
*/
public abstract void processMessage(String topic, String tags, String body);
}

上面我们介绍了顺序消息,它主要将相同的消息投递到一个队列中的,具体如何投递呢

4. 消息投递策略

上面我们介绍了顺序消息,但是RocketMQ还支持那些投递策略呢、

RocketMQ 的消息模型整体并不复杂,如下图所示:

image-20230922161309364

一个Topic(消息主题)可能对应多个实际的消息队列(MessgeQueue)

​ 在底层实现上,为了提高MQ的可用性和灵活性,一个Topic在实际存储的过程中,采用了多队列的方式,具体形式如上图所示。每个消息队列在使用中应当保证先入先出(FIFO,First In First Out)的方式进行消费。

那么,基于这种模型,就会引申出两个问题:

  • 生产者 在发送相同Topic的消息时,消息体应当被放置到哪一个消息队列(MessageQueue)中?
  • 消费者 在消费消息时,应当从哪些消息队列中拉取消息?

4.1 生产者投递策略

生产者投递策略就是讲如何讲一个消息投递到不同的queue中

4.1.1 轮询算法投递

默认投递方式:基于Queue队列轮询算法投递

​ 默认情况下,采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀,算法如下图所示:

4.1.2 顺序投递策略

在有些场景下,需要保证同类型消息投递和消费的顺序性。

​ 例如,假设现在有TOPIC topicTest,该 Topic下有4个Queue队列,该Topic用于传递订单的状态变迁,假设订单有状态:未支付已支付发货中(处理中)发货成功发货失败

在时序上,生产者从时序上可以生成如下几个消息:

1
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中(处理中) --> 订单T0000001:发货失败

消息发送到MQ中之后,可能由于轮询投递的原因,消息在MQ的存储可能如下:

image-20230922161314539

​ 这种情况下,我们希望消费者消费消息的顺序和我们发送是一致的,然而,有上述MQ的投递和消费机制,我们无法保证顺序是正确的,对于顺序异常的消息,消费者 即使有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

​ 基于上述的情况,RockeMQ采用了这种实现方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 queue队列中,然后消费者再采用一定的策略(一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性

image-20230922161318772

生产者在消息投递的过程中,使用了 MessageQueueSelector 作为队列选择的策略接口,其定义如下:

1
2
3
4
5
6
7
8
9
10
public interface MessageQueueSelector {
/**
* 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
* @param mqs 待选择的MQ队列选择列表
* @param msg 待发送的消息体
* @param arg 附加参数
* @return 选择后的队列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
4.1.3 自带实现类
投递策略 策略实现类 说明
随机分配策略 SelectMessageQueueByRandom 使用了简单的随机数选择算法
基于Hash分配策略 SelectMessageQueueByHash 根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index
基于机器机房位置分配策略 SelectMessageQueueByMachineRoom 开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配

4.2 消费者分配队列

RocketMQ对于消费者消费消息有两种形式:

  • BROADCASTING:广播式消费,这种模式下,一个消息会被通知到每一个消费者
  • CLUSTERING: 集群式消费,这种模式下,一个消息最多只会被投递到一个消费者上进行消费 模式如下:

image-20230922161322619

​ 对于使用了消费模式为MessageModel.CLUSTERING进行消费时,需要保证一个消息在整个集群中只需要被消费一次。实际上,在RoketMQ底层,消息指定分配给消费者的实现,是通过queue队列分配给消费者的方式完成的:也就是说,消息分配的单位是消息所在的queue队列

queue队列指定给特定的消费者后,queue队列内的所有消息将会被指定到消费者进行消费。

​ RocketMQ定义了策略接口AllocateMessageQueueStrategy,对于给定的消费者分组,和消息队列列表消费者列表当前消费者应当被分配到哪些queue队列,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 为消费者分配queue的策略算法接口
*/
public interface AllocateMessageQueueStrategy {

/**
* Allocating by consumer id
*
* @param consumerGroup 当前 consumer群组
* @param currentCID 当前consumer id
* @param mqAll 当前topic的所有queue实例引用
* @param cidAll 当前 consumer群组下所有的consumer id set集合
* @return 根据策略给当前consumer分配的queue列表
*/
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);

/**
* 算法名称
*
* @return The strategy name
*/
String getName();
}

相应地,RocketMQ提供了如下几种实现:

算法名称 含义
AllocateMessageQueueAveragely 平均分配算法
AllocateMessageQueueAveragelyByCircle 基于环形平均分配算法
AllocateMachineRoomNearby 基于机房临近原则算法
AllocateMessageQueueByMachineRoom 基于机房分配算法
AllocateMessageQueueConsistentHash 基于一致性hash算法
AllocateMessageQueueByConfig 基于配置分配算法

为了讲述清楚上述算法的基本原理,我们先假设一个例子,下面所有的算法将基于这个例子讲解。

假设当前同一个topic下有queue队列 10个,消费者共有4个,如下图所示:

4.2.1 平均分配算法

​ 这里所谓的平均分配算法,并不是指的严格意义上的完全平均,如上面的例子中,10个queue,而消费者只有4个,无法是整除关系,除了整除之外的多出来的queue,将依次根据消费者的顺序均摊。

​ 按照上述例子来看,10/4=2,即表示每个消费者平均均摊2个queue;而10%4=2,即除了均摊之外,多出来2个queue还没有分配,那么,根据消费者的顺序consumer-1consumer-2consumer-3consumer-4,则多出来的2个queue将分别给consumer-1consumer-2

最终,分摊关系如下:

  • consumer-1:3个
  • consumer-2:3个
  • consumer-3:2个
  • consumer-4:2个

image-20230922161328954

4.2.2 环形平均分配

​ 环形平均算法,是指根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配

其基本模式如下:

image-20230922161332769

这种算法最终分配的结果是: consumer-1: #0,#4,#8 consumer-2: #1, #5, # 9 consumer-3: #2,#6 consumer-4: #3,#7

4.2.3 使用方式

默认消费者使用使用了AllocateMessageQueueAveragely平均分配策略

如果需要使用其他分配策略,使用方式如下

1
2
3
//创建一个消息消费者,并设置一个消息消费者组,并指定使用一致性hash算法的分配策略
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null,"rocket_test_consumer_group",null,new AllocateMessageQueueConsistentHash());
.....

5. RocketMQ消息保障

下面我们详细说下如何保障消息不丢失以及消息幂等性问题

5.1 生产端保障

生产端保障需要从一下几个方面来保障

  1. 使用可靠的消息发送方式
  2. 注意生产端重试
  3. 生产禁止自动创建topic
5.1.1 消息发送保障
5.1.1.1 同步发送

发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果,会在收到接收方发回响应之后才发下一个数据包的通讯方式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

image-20230922161337431

​ 简单来说,同步发送就是指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式。

使用场景

​ 由于这种同步发送的方式确保了消息的可靠性,同时也能及时得到消息发送的结果,故而适合一些发送比较重要的消息场景,比如说重要的通知邮件、营销短信等等。在实际应用中,这种同步发送的方式还是用得比较多的。

注意事项

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

5.1.1.2 异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback

image-20230922161342687

​ 异步发送是指 producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式。需要注意的是,不等待 broker 响应,并不意味着 broker 不响应,而是通过回调接口来接收 broker 的响应,所以要记住一点,异步发送同样可以对消息的响应结果进行处理。

使用场景

​ 由于异步发送不需要等待 broker 的响应,故在一些比较注重 RT(响应时间)的场景就会比较适用。比如,在一些视频上传的场景,我们知道视频上传之后需要进行转码,如果使用同步发送的方式来通知启动转码服务,那么就需要等待转码完成才能发回转码结果的响应,由于转码时间往往较长,很容易造成响应超时,此时,如果使用的是异步发送通知转码服务,那么就可以等转码完成后,再通过回调接口来接收转码结果的响应了。

5.1.2 消息发送总结
5.1.2.1 发送方式对比
发送方式 发送 TPS 发送结果反馈 可靠性 适用场景
同步发送 一般 不丢失 重要的通知场景
异步发送 不丢失 比较注重 RT(响应时间)的场景
单向发送 最快 可能丢失 可靠性要求并不高的场景
5.1.2.2 使用场景对比

在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
  • 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
5.1.3 发送状态

发送消息时,将获得包含SendStatus的SendResult,首先,我们假设Message的isWaitStoreMsgOK = true(默认为true),如果没有抛出异常,我们将始终获得SEND_OK,以下是每个状态的说明列表:

5.1.3.1 FLUSH_DISK_TIMEOUT

​ 如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout (默认是 5 秒)设置的时间内完成刷盘,就会收到此状态码。

5.1.3.2 FLUSH_SLAVE_TIMEOUT

​ 如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定时间内完成同步,就会收到此状态码。

5.1.3.3 SLAVE_NOT_AVAILABLE

​ 如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。

5.1.3.4 SEND_OK

​ 这个状态可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK,需要注意的是,SEND_OK 并不意味着可靠,如果想严格确保没有消息丢失,需要开启 SYNC_MASTER or SYNC_FLUSH

5.1.3.5 注意事项

​ 如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着消息会丢失,有2个选择,一是无所谓,适用于消息不关紧要的场景,二是重发,但可能产生消息重复,这就需要consumer进行去重控制,如果收到了 SLAVE_NOT_AVAILABLE 就要赶紧通知管理员了。

5.1.4 MQ发送端重试保障

如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

​ DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:

1
2
3
4
5
6
7
8
9
//设置消息发送失败时的最大重试次数
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}

//同步发送消息,并指定超时时间
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
5.1.4.1 重试解惑

超时重试针对网上说的超时异常会重试的说法都是错误的

​ 是因为下面测试代码的超时时间设置为5毫秒 ,按照正常肯定会报超时异常,但设置1次重试和3000次的重试,虽然最终都会报下面异常,但输出错误时间报显然不应该是一个级别,但测试发现无论设置的多少次的重试次数,报异常的时间都差不多。

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RetryProducer {
public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

//指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
//设置重试次数(默认2次)
producer.setRetryTimesWhenSendFailed(300000);
//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();
Message msg = new Message(
/* 消息主题名 */
"topicTest",
/* 消息标签 */
"TagA",
/* 消息内容 */
("Hello Java demo RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息并返回结果,设置超时时间 5ms 所以每次都会发送失败
SendResult sendResult = producer.send(msg, 5);

System.out.printf("%s%n", sendResult);
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}

揭晓答案

针对这个疑惑,需要查看源码,发现同步发送的时候,超时是不重试的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 说明 抽取部分代码
*/
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {

//1、获取当前时间
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev ;
//2、去服务器看下有没有主题消息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
//3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//4、根据设置的重试次数,循环再去获取服务器主题消息
for (times = 0; times < timesTotal; times++) {
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
//5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的
if (timeout < costTime) {
callTimeout = true;
break;

}
//6、如果超时直接报错
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
}
}

异步重试

可以通过以下代码设置异步从重试次数,默认两次

1
producer.setRetryTimesWhenSendAsyncFailed(2);

异步重试是通过递归的方式来进行重试的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
MQClientAPIImpl#sendMessageAsync
private void sendMessageAsync(... ) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
.....
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
//发送重试请求
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, true, producer);
}
.....
MQClientAPIImpl#onExceptionImpl
private void onExceptionImpl(...){
//获取到当前重试次数
int tmp = curTimes.incrementAndGet();
//是否需要重试,并且重试次数小于timesTotal
if (needRetry && tmp <= timesTotal) {
...
try {
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
...
}

}
5.1.4.2 重试总结

通过这段源码很明显可以看出以下几点

  1. 如果是异步发送默认重试次数是两次,通过递归的方式进行重试
  2. 对于同步而言,超时异常也是不会再去重试
  3. 如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。
5.1.5 禁止自动创建topic
5.1.5.1 自动创建TOPIC流程

autoCreateTopicEnable设置为true 标识开启自动创建topic

  1. 消息发送时如果根据topic没有获取到 路由信息,则会根据默认的topic去获取,获取到路由信息后选择一个队列进行发送,发送时报文会带上默认的topic以及默认的队列数量。
  2. 消息到达broker后,broker检测没有topic的路由信息,则查找默认topic的路由信息,查到表示开启了自动创建topic,则会根据消息内容中的默认的队列数量在本broker上创建topic,然后进行消息存储。
  3. broker创建topic后并不会马上同步给namesrv,而是每30进行汇报一次,更新namesrv上的topic路由信息,producer会每30s进行拉取一次topic的路由信息,更新完成后就可以正常发送消息,更新之前一直都是按照默认的topic查找路由信息。
5.1.5.2 为什么不能开启自动创建

​ 上述 broker 中流程会有一个问题,就是在producer更新路由信息之前的这段时间,如果消息只发送到了broker-a,则broker-b上不会创建这个topic的路由信息,broker互相之间不通信,当producer更新之后,获取到的broker列表只有broker-a,就永远不会轮询到broker-b的队列(因为没有路由信息),所以我们生产通常关闭自动创建broker,而是采用手动创建的方式。

5.1.6 发端端规避

注意了,这里我们发现,有可能在实际的生产过程中,我们的 RocketMQ 有几台服务器构成的集群

其中有可能是一个主题 TopicA 中的 4 个队列分散在 Broker1、Broker2、Broker3 服务器上。

image-20230922161350213

​ 如果这个时候 Broker2 挂了,我们知道,但是生产者不知道(因为生产者客户端每隔 30S 更新一次路由,但是 NamServer 与 Broker 之间的心跳检测间隔是 10S,所以生产者最快也需要 30S 才能感知 Broker2 挂了),所以发送到 queue2 的消息会失败,RocketMQ 发现这次消息发送失败后,就会将 Broker2排除在消息的选择范围,下次再次发送消息时就不会发送到 Broker2,这样做的目的就是为了提高发送消息的成功率。

5.1.6.1 问题梳理

​ 例如在发送之前 sendWhichQueue 该值为 broker-a 的 q1,如果由于此时 broker-a 的突发流量异常大导致消息发送失败,会触发重试,按照轮循机制,下一个选择的队列为 broker-a 的 q2 队列,此次消息发送大概率还是会失败,即尽管会重试 2 次,但都是发送给同一个 Broker 处理,此过程会显得不那么靠谱,即大概率还是会失败,那这样重试的意义将大打折扣。

​ 故 RocketMQ 为了解决该问题,引入了故障规避机制,在消息重试的时候,会尽量规避上一次发送的 Broker,回到上述示例,当消息发往 broker-a q1 队列时返回发送失败,那重试的时候,会先排除 broker-a 中所有队列,即这次会选择 broker-b q1 队列,增大消息发送的成功率。

上述规避思路是默认生效的,即无需干预。

5.1.6.2 规则策略

但 RocketMQ 提供了两种规避策略,该参数由 sendLatencyFaultEnable 控制,用户可干预,表示是否开启延迟规避机制,默认为不开启。(DefaultMQProducer中设置这两个参数)

  • sendLatencyFaultEnable 设置为 false:默认值,不开启,延迟规避策略只在重试时生效,例如在一次消息发送过程中如果遇到消息发送失败,规避 broekr-a,但是在下一次消息发送时,即再次调用 DefaultMQProducer 的 send 方法发送消息时,还是会选择 broker-a 的消息进行发送,只要继续发送失败后,重试时再次规避 broker-a。
  • sendLatencyFaultEnable 设置为 true:开启延迟规避机制,一旦消息发送失败会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息,这个延迟时间就是通过 notAvailableDuration、latencyMax 共同计算的,就首先先计算本次消息发送失败所耗的时延,然后对应 latencyMax 中哪个区间,即计算在 latencyMax 的下标,然后返回 notAvailableDuration 同一个下标对应的延迟值。
5.1.6.3 注意事项

​ 如果所有的 Broker 都触发了故障规避,并且 Broker 只是那一瞬间压力大,那岂不是明明存在可用的 Broker,但经过你这样规避,反倒是没有 Broker 可用来,那岂不是更糟糕了?针对这个问题,会退化到队列轮循机制,即不考虑故障规避这个因素,按自然顺序进行选择进行兜底。

5.2 消费端保障

5.2.1 注意幂等性

应用程序在使用RocketMQ进行消息消费时必须支持幂等消费,即同一个消息被消费多次和消费一次的结果一样。这一点在使用RoketMQ或者分析RocketMQ源代码之前再怎么强调也不为过。

“至少一次送达”的消息交付策略,和消息重复消费是一对共生的因果关系。要做到不丢消息就无法避免消息重复消费。原因很简单,试想一下这样的场景:客户端接收到消息并完成了消费,在消费确认过程中发生了通讯错误。从Broker的角度是无法得知客户端是在接收消息过程中出错还是在消费确认过程中出错。为了确保不丢消息,重发消息是唯一的选择。

​ 有了消息幂等消费约定的基础,RocketMQ就能够有针对性地采取一些性能优化措施,例如:并行消费、消费进度同步机制等,这也是RocketMQ性能优异的原因之一。

5.2.2 消息消费模式

从不同的维度划分,Consumer支持以下消费模式:

  • 广播消费模式下,消息消费失败不会进行重试,消费进度保存在Consumer端;
  • 集群消费模式下,消息消费失败有机会进行重试,消费进度集中保存在Broker端。
5.2.2.1 集群消费

使用相同 Group ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点

image-20230922161354814

注意事项

  • 消费端集群化部署, 每条消息只需要被处理一次。
  • 由于消费进度在服务端维护, 可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
5.2.2.2 广播消费

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

image-20230922161358592

注意事项

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
5.2.2.3 集群模式模拟广播

如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。

image-20230922161401910

注意事项

  • 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。
  • 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。同时,实例之间订阅关系必须保持一致。
5.2.3 消息消费模式

RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。

  • 推消息模式下,消费进度的递增是由RocketMQ内部自动维护的;
  • 拉消息模式下,消费进度的变更需要上层应用自己负责维护,RocketMQ只提供消费进度保存和查询功能。
5.2.3.1 推模式(PUSH)

我们上面使用的消费者都是PUSH模式,也是最常用的消费模式

​ 由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。

实现方式,代码上使用 DefaultMQPushConsumer

​ consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。

5.2.3.2 拉模式(PULL)

RocketMQ的PUSH模式是由PULL模式来实现的

​ 由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。

5.2.3.3 注意事项

注意:RocketMQ 4.6.0版本后将弃用DefaultMQPullConsumer

DefaultMQPullConsumer方式需要手动管理偏移量,官方已经被废弃,将在2022年进行删除

image-20230922161407267

DefaultLitePullConsumer

该类是官方推荐使用的手动拉取的实现类,偏移量提交由RocketMQ管理,不需要手动管理

5.2.4 消息确认机制

consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)

​ 为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

5.2.4.1 确认消费

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。

1
2
3
4
5
6
7
8
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
execute();//执行真正消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
})
5.2.4.2 消费异常

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

1
2
3
4
5
6
7
8
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
execute();//执行真正消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER
}
})

​ 为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

5.2.5 消息重试机制
5.2.5.1 顺序消息的重试

​ 对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况,因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

5.2.5.2 无序消息的重试

​ 无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

5.2.5.3 重试次数

消息队列RocketMQ版默认允许每条消息最多重试16次,每次重试的间隔时间如下。

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10秒 9 7分钟
2 30秒 10 8分钟
3 1分钟 11 9分钟
4 2分钟 12 10分钟
5 3分钟 13 20分钟
6 4分钟 14 30分钟
7 5分钟 15 1小时
8 6分钟 16 2小时

​ 如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。

5.2.5.4 和生产端重试区别

消费者和生产者的重试还是有区别的,主要有两点

  • 默认重试次数:Product默认是2次,而Consumer默认是16次
  • 重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。

注意:Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。

5.2.5.5 重试配置方式

需要重试

消费失败后,重试配置方式,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 方式1:返回RECONSUME_LATER(推荐)
  • 方式2:返回Null
  • 方式3:抛出异常

无需重试

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//消息处理逻辑抛出异常,消息将重试。
try {
doConsumeMessage(list);
}catch (Exception e){
//捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//业务方正常消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

5.3 死信队列

在正常情况下无法被消费(超过最大重试次数)的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列就称为死信队列(Dead-Letter Queue)

​ 当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

5.3.1 死信特性
5.3.1.1 死信消息特性
  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除,故死信消息应在产生的 3 天内及时处理
5.3.1.2 死信队列特性
  • 一个死信队列对应一个消费者组,而不是对应单个消费者实例
  • 一个死信队列包含了对应的 Group ID 所产生的所有死信消息,不论该消息属于哪个 Topic
  • 若一个 Group ID 没有产生过死信消息,则 RocketMQ 不会为其创建相应的死信队列

6. Redis 轮询队列

redis队列中存放车辆信息,调度系统从队列中获取车辆信息,打车完成后再将车辆信息放回队列中

image-20230922161413770

6.1 相关代码

6.1.1 redis获取车辆

从list左侧弹出一个车辆

1
2
3
4
5
6
7
8
9
10
11
/**
* 从Redis List列表中拿取一个车辆ID
* 如果没有获取到延时10S
*
* @return
*/
public String takeVehicle() {
//从Redis List列表中拿取一个车辆ID
return redisTemplate.opsForList().leftPop(DispatchConstant.VEHICLE_QUEUE, 1, TimeUnit.SECONDS);
}

6.1.2 redis压入车辆

检查车辆状态,并从右侧压入车辆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 设置车辆状态为Ready
*
* @param vehicleId
*/
public void readyDispatch(String vehicleId) {
//检查车辆状态
DispatchConstant.DispatchType vehicleDispatchType = taxiVehicleStatus(vehicleId);
//如果车辆时运行状态
if (vehicleDispatchType.isRunning() || vehicleDispatchType.isReady()) {
redisTemplate.opsForValue().set(DispatchConstant.VEHICLE_STATUS_PREFIX + vehicleId, DispatchConstant.DispatchType.READY.toString());
//从右侧压入车辆
redisTemplate.opsForList().rightPush(DispatchConstant.VEHICLE_QUEUE, vehicleId);
}
}