编程人 cdmana.com

RocketMQ原理

基础概念

消息模型(Message Model)

RocketMq主要由三部分组成:Producer(发送消息)、Broker(存储消息)、Consumer(消费消息)。Broker实际部署中对应一台服务器,每个Broker可以存储多个Topic消息,每个Topic消息也可以分片存储不同的Broker中,Message Queue用于存储消息的物理地址,每个Topic消息地址存储多个Message Queue中。

生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

生产者中会把同一类Producer组成一个集合叫生产者组,如果发送事务消息后生产者宕机,则Broker服务会联系同一生产者组其它生产者实例以提交或回溯消费者。

消费者(Consumer)

负责消费消息,后台系统负责异步消费,消费者从Broker拉取消息并提供给应用程序,提供两种消费形式:拉取消费、推动消费。

  • 拉取消费通常主动调用Consumer拉消息方法从Broker拉消息,主动权由应用控制。
  • 推动消费由Broker收到数据后主动推送给消费者,该模式实时性好。

消费者同样会把一类Consumer组成一个集合:消费者组,实现负载均衡和容错变得非常容易,消费者组必须订阅相同的Topic,RocketMQ支持两种消费模式:集群消费Clustering,广播消费Broadcasting。

  • 集群消费下相同Consumer Group每个Consumer实例平均分摊消息。
  • 广播消费下相同Consumer Group每个Consumer实例接收全量消息。

主题(Topic)

表示一类消息集合,每个主题包含若干条消息,每条消息只属于一个主题。
建议一个App建一个Topic通过Tag来表示不同类型的消息。

消息(Message)

生产和消费最小单位,每条消费有唯一Message ID,可以携带业务key,系统提供了Message ID和key查询功能。也可以为消费打tag标签,过滤不同业务消息。

消息存储

何时存储消息

分布式队列具有可靠性,所有数据要进行持久化存储。

  • MQ收到消息后向生产者返回一个ACK响应,并将消息存储起来。
  • MQ Push一条消息给消费者后等待消费者ACK响应,将消息标记为已消费,如没有标记为已消费MQ不断尝试往消费者推送消息(默认16次)。
  • MQ定期删除一些过期消息,保证服务一直可用。

存储介质

MQ和Kafka一样通过文件存储机制,直接用磁盘文件来保存消息,不需要借助MySQL这一类索引工具。

  • 采用顺序IO和零拷贝技术加速文件读写。
  • 异步刷盘采用MMAP机制刷盘,每次只能映射1.5G~2G。

存储结构

所有消息存储在CommitLog文件中。
ConsumerQueue消息索引,每个topic建立一个索引文件,CommitLog Offset和Message Tag建立索引。
IndexFile:通过key,CommitLog Offset、TimeStamp、NextIndexOffset建立索引方便用户更精确定位消息。

在这里插入图片描述

主从复制

Broker集群方式部署,有一个master节点和多个slave节点,消息需要从Master复制到Slave上,消息复制方式分为同步和异步。

  • 同步复制:Master和Slave都写入消息成功后才反馈给客户端写入成功状态。如果Master节点故障,Slave有全部数据备份,容易恢复,但是大数据写入延迟,降低系统吞吐量。

  • 异步复制:只有Master写入成功就返回成功状态,再异步复制给Slave节点。异步复制系统拥有较低的延迟和较高的吞吐量,如果Master宕机而数据没有复制完就会造成数据丢失。

在这里插入图片描述

负载均衡

Producer默认采用轮询方式,发送到不同的MessageQueue中,如果是顺序消息发送到指定MessageQueue中。

在这里插入图片描述

Consumer如果是广播(BROADCASTING)模式那么每个消费者全部收到消息。
Consumer如果是集群(CLUSTERING)模式那么只能由一个消费者收到消息。

  • AllocateMessageQueueAveragely(默认):平均分配。当消息队列个数小于可消费客户端时,消息队列与客户端对应情况如左侧图;当消息队列个数大于可消费客户端时,消息队列与客户端对应情况如右侧图。
    在这里插入图片描述

  • AllocateMessageQueueAveragelyByCircle:环形分配。以环形方式平均分配。
    在这里插入图片描述

  • AllocateMachineRoomNearby:同机房分配。首先统计消费者与broker所在机房,保证broker中的消息优先被同机房的消费者消费,如果机房中没有消费者,则有其他机房消费者消费。实际的队列分配(同机房或跨机房)可以是指定其他算法。
    在这里插入图片描述

  • AllocateMessageQueueByMachineRoom:指定机房分配。第一张图是消费者小于队列数情况,第二张图是消费者多余队列数情况。假设有三个机房,配置机房三不在消费者的服务范围内,则实际消费对应关系如下两图所示。
    在这里插入图片描述

在这里插入图片描述

  • AllocateMessageQueueByConfig:手动分配。使用一致性哈希算法进行负载,每次负载都会重新创建一致性hash路由表,获取本地客户端负责的所有队列信息。RocketMQ默认的hash算法为MD5。假设有4个客户端的clientId和两个消息队列mq1,mq2,,通过hash后分布在hash环的不同位置,按照一致性hash的顺时针查找原则,mq1被client2消费,mq2被client3消费。

消息重试

  • 广播消息:不存在消息重度机制,即消息消费失败后不会再重新进行发送,而只是继续消费新的消息。
  • 普通消息:当消费者消费失败后,可以通过设置返回状态达到消息重试结果。返回Action.ReconsumerLater(推荐),返回null,抛出异常。

重试消息需要进入一个%RETRY%+ConsumeGroup队列中。默认允许每条消息最多重试16次,每次重试时间间隔如下:

重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间
1 10秒 2 30秒
3 1分钟 4 2分钟
5 3分钟 6 4分钟
7 5分钟 8 6分钟
9 7分钟 10 8分钟
11 9分钟 12 10分钟
13 20分钟 14 30分钟
15 1小时 16 2小时

如果重试次数超过16次将转为死信队列。
注意:无论重试多少次Message ID全一样,事务消息除外。

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试,而如果消息超过最大重度次数,RocketMQ就会认为这个消息有问题,将消息放入死信队列。
死信队列名称:%DLQ%+ConsumeGroup

特征:

  • 一个死信队列对应一个ConsumeGroup,而不是对应某个消费者实例。
  • 如果一个ConsumeGroup没有产生死信消息,RocketMQ就不会为某创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里所有列信消息,不区分Topic。
  • 死信队列中消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,可通过broker.conf对应的fileReservedTime属性配置,超过这个时间会被删除,不管消息是否消费过。
  • 列信队列默认权限:2(2:禁读,4:禁写,6:可读可写),需要改成6可能正常消费。

消息幂等

在MQ系统中对于消息幂等有三种实现语义:

  • at most once最多一次:每条消息最多只会被消费一次。
  • at least once至少一次:每条消息至少会被消费一次。
  • exactly once刚刚好一次:每条消息只会确定的消费一次。

at most once最好保证,采用异步发送、sendOneWay等方式都可保证。
at least once同步发送、事务消息等多种方式也可保证。
exactly once最难保证,RocketMQ只能保证at least once,保证不了exactly once,需要通过业务进行保证。建议发送消息时候带业务key,例如订单id,消费前判断订单是否被处理过。

Scroll to Top