rocketmq 原理解析

本文最后更新于:7 个月前

rocketmq 原理解析

RocketMQ 如何保证消息不丢失/保证可靠性

消息在 RocketMQ 流转大概可以分为三个阶段:发送阶段、存储阶段、消费阶段,那么可靠性就要从这三个阶段考虑。

  • 生产者发送消息时主要依靠发送确认来确保消息可靠性的。同步异步发送都可以获取到发送状态,通过这个发送状态来判断本次消息是否成功发送。另外,发送消息时还可以指定一个超时时间,如果超出这个超时时间可以再次发送。
  • 消费者消费消息时也存在一个消费确认机制,当消费者消费消息成功或失败都会给 Broker 返回消费状态,消费成功则结束本次流程;消费失败 Broker 则会重新发送消息。如果停电、宕机 Broker 都不会认为消费成功,也会继续重新投递。
  • Broker 存储阶段保证消息不丢失的手段就是把消息记录到 CommitLog 中,保证消息不会丢失。

关于保证成功记录到 CommitLog 有两种方式:

  • 同步刷盘,生产者把消息发送到 Broker 后,只有 Broker 成功地把消息写入到 CommitLog 后,才能给生产者返回发送成功的 ACK。这种方式可靠性更高,但是牺牲了效率。
  • 异步刷盘,Broker 把消息写入到 CommitLog 采用后台异步线程刷盘的方式,刷盘完成后回调接口返回发送成功的 ACK。可以降低读写延迟,提高 RocketMQ 的吞吐量,但是当 Broker 宕机时会丢失部分未从内存中写入到文件的消息。

RocketMQ 中消息重复的问题

在分布式消息队列中,同时确保消息不丢失和不重复是很难的,RocketMQ 选择了保证消息不丢失,消息重复的问题需要在业务端自行解决。

在 RocketMQ 中,造成消息重复的根本原因是网络波动,会导致消费者收到两条一样的消息。

RocketMQ 不保证消息不重复,如果要严格确保不重复,需要在业务端去重,可以使用如下手段:

  • 多次消费不影响:消费端的消息处理业务逻辑保证幂等性,这样无论消息消费多少次都对业务没有影响;
  • 过滤重复消息:生产者发送时确保每一条消息都有唯一编号(业务相关的比如说是订单号),建立一个消费记录表,当拿到这个消息时保存到数据库,给这个消息做唯一约束,当出现重复消费时,唯一约束就不满足,那么就抛弃这条消息。

RocketMQ 消息堆积问题

消息队列其中一个很重要的作用就是削峰,那么消息队列必然有一定的消息堆积能力来顶住请求的洪峰来保证后端服务的稳定性。

如果发生消息积压,这时候需要考虑如何让消费者提高消费能力,可以从以下情况考虑:

  • 如果 Queue 的数量大于消费者数量,这时需要做的是消费者扩容,默认消费模式是集群消费模式,消息会雨露均沾地发送给消费者,所以可以让消费者数量增加到和 Queue 的数量一致。

img

  • 如果 Queue 的数量小于或等于消费者数量还发生大规模消息堆积时,这种情况无论再增加消费者数量,消费能力都不会提升,这时需要做的是消息迁移 Queue 扩容,具体做法是修改消费者逻辑,让消费者把这些消息使用一个临时的 Topic,这个 Topic 下建更多的 Queue,把原来的消息转发到这些 Queue 上,另外安排对应这个临时 Topic 的消费者来消费这些堆积的消息。

img

RocketMQ 中顺序消息的问题

RocketMQ 的顺序消息包含两个层面,有顺序地生产消息以及有顺序地消费消息。有些业务场景下必须保证顺序,比如订单的生成、付款、发货,这个顺序是必须保证的。

RocketMQ 顺序消息可以分为全局有序以及分区有序,全局有序与分区有序的区别与落地在这篇文章中也适当介绍了:RocketMQ 操作落地 (rocketmq-client 方式)

全局有序消息

img

分区有序消息

img

如果要严格控制消息的顺序,那么生产者、queue、消费者最好都是一对一的关系,把整个流程中并发的部分全都消除了,各部分都设计成单线程工作。但是这样的设计,完全牺牲了 RocketMQ 高并发高吞吐的特性,也容易成为系统性能瓶颈。

但是全局有序导致性能低下的问题 RocketMQ 不打算解决,理由如下:

  • 乱序的应用实际上大量存在,系统中一般极少数情况需要对消息做严格的顺序;
  • 消息在队列中无序,不代表最终消息也是无序的,可以使用其他手段来控制顺序。比如给消息打上标识顺序的标签,在业务层再处理顺序问题等。

RocketMQ 延时消息原理

SCHEDULE_TOPIC_XXXX 介绍

SCHEDULE_TOPIC_XXXX 是 RocketMQ 一个系统类型的 Topic,用于标识延时消息。

这个 Topic 有 18 个队列,分别唯一对应着 RocketMQ 的 18 个延时等级,对应关系为:queueId = delayTimeLevel – 1

ScheduleMessageService 介绍

这是 Broker 中的一个延时服务,专门消费 Topic 为 SCHEDULE_TOPIC_XXXX 的延时消息,并将其投递到目标 Topic 中。

ScheduleMessageService 在启动时,会创建一个定时器 Timer,并根据延迟级别的个数,启动对应数量的 TimerTask,每个 TimerTask 负责一个延迟级别的消费与投递。

延时消息在 Broker 的流转过程

生产者发送延时消息到 Broker,再到消费者消费的过程,消息将经过以下流转:

img

  1. Broker 把消息的 Topic 修改成SCHEDULE_TOPIC_XXX,然后根据本次消息的延时等级计算需要投递到的具体队列。同时还要把消息原来的 Topic 及其队列信息存储到消息的属性中,方便后面正确投递。
  2. 在从 CommitLog 把消息转发到 queue 的过程中,会计算这个延时消息需要在什么时候进行投递,投递时间=消息存储时间+延时等级对应的时间
  3. 延时消费服务 ScheduleMessageService 消费这个延时消息。
  4. 从消息属性中取出并设置原来消息的 Topic 和队列信息,存储到 CommitLog. 此时这个消息已经完成延时,和普通消息没有区别,所以 ConsumeQueue 中的 Message Tag HashCode 需要重新计算消息 Tag 的哈希值再存储。
  5. 由于消息的 Topic 已经修改为原来的 Topic,所以直接投递到对应的队列中。
  6. 消费者消费这条消息。

RocketMQ 事务消息原理

实现事务消息核心

  • 两阶段提交:第一阶段生产者发送 Half 消息到 Broker 来测试 RocketMQ 是否正常;Broker 只有在收到第二阶段的消息时,消费者才能对消息进行消费。
  • 事务补偿机制:当 Broker 收到状态为 unknown 的消息或者由于网络波动、生产者宕机导致长时间没有收到第二阶段的提交时,Broker 会调用生产者接口来回查本次事务的状态。

这两个核心是实现分布式事务最终一致性的关键。

事务消息的流程

以支付订单后奖励积分为例,此时生产者是订单系统,消费者是积分系统,当积分系统收到订单系统传来订单支付成功,那么就给用户提供积分的奖励

img

  • 订单系统会发送一条 Half 消息到 RocketMQ 中,这个 Half 消息其实是一个代表订单成功支付的消息,只不过目前这个状态积分系统是无法感知这个消息的存在的。
  • 如果发送 Half 消息后没有收到 MQ 的响应,那么可以认定 MQ 此时有问题,那么就在订单系统中回滚这笔订单,例如订单关闭或者发起退款。
  • 如果收到 MQ 的响应,那么可以认定 MQ 是正常的,订单系统可以执行自己的本地事务,比如更新订单状态。
  • 如果在处理自己系统的业务时,本地事务发生异常了,那么就发送一个 rollback 请求到 MQ 中,让 MQ 删除之前发送的 Half 消息;如果业务逻辑成功执行、本地事务成功提交,那么就发送一个 commit 请求到 MQ 中,MQ 收到 commit 请求后,之前的 Half 消息也就对积分系统可见了;如果业务逻辑的事务状态为 unknown ,那么 MQ 就会发起回查,回查生产者本地事务的状态。
  • 假设由于网络波动、生产者重启导致事务消息的二次确认丢失,MQ 也有补偿措施,它会去扫描自己处于 Half 状态的消息,如果这个 MQ 一直没有接收到对这个 Half 消息的第二阶段的提交,会回调一个接口,让订单系统查询这个订单的状态,进而进行第二阶段的提交。所以这个回查的接口非常重要,要保证提交 commit 或者 rollback。

死信队列原理

死信队列用于处理无法被正常消费的消息,即死信消息。

死信消息的产生

当一条消息在消费失败时,RocketMQ 消费者会自动对消息进行重试消费;当重试失败次数达到最大值依然是失败时,那么可以认为消费者在正常情况下是无法消费这个消息的。此时,RocketMQ 并不会立刻丢弃这个消息,而是将其发送到消费者对应的死信队列中。

死信消息放到死信队列中后不会再被原来的消费者消费,此时他的 Topic 变成了 %DLQ%消费者组 ,需要使用另外的消费者来订阅消费这些死信消息。死信消息可以助于我们统计异常数据并做后续的数据修复处理。

img

默认的重试次数与重试间隔时间如下:

重试次数 重试间隔时间 重试次数 重试间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

其中重试间隔时间可以通过修改 Broker 的配置文件的 messageDelayLevel 配置项来修改。

# 消息重试16次分别间隔
messageDelayLevel =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

死信队列的特点

一个死信队列对应一个消费者组,不对应某一个消费者实例或 Topic。


rocketmq 原理解析
https://lunasaw.github.io/2022/09/06/basic-middle-rocketmq/
著者
luna
作成日
2022年9月6日
著作権