-
- 1.1. 使用MQ会带来哪些问题?
- 1.2. MQ选型
- 1.3. RocketMQ的优点有哪些?
- 1.4. 🌟什么场景下使用MQ?为什么要用MQ?
-
- 2.1. 讲一讲rocketmq中的架构?消息消费模型?
- 2.1.1. producer
- 2.1.2. consumer
- 2.1.3. broker
- 2.1.4. Nameserver
- 2.2. RocketMQ为什么不用zookeper做注册中心?谈谈你的理解?
- 2.1. 讲一讲rocketmq中的架构?消息消费模型?
-
- 3.1. 消费模式有哪些?对于不同的消费模式如何保存消费进度?
- 3.2. 什么是exactly once?rocketmq如何保证exactly once?
- 3.3. 什么是消息重试
- 3.4. 什么是死信队列?
- 3.5. 生产和消费的queue负载均衡算法了解吗?
- 3.6. RocketMQ的长轮询了解吗?
-
- 4.1. rocketmq的存储结构讲一讲?
- 4.1.1. commitLog
- 4.1.2. consumequeue
- 4.1.3. indexFile
- 4.2. rocketmq持久化机制讲一讲?
- 4.3. rocketmq为了提升读取采用了哪些手段?
- 4.1. rocketmq的存储结构讲一讲?
-
- 5.1. 如何解决重复消费问题?
- 5.2. 如何保证消息不丢/可用性/可靠性?
- 5.3. 如何解决消息挤压问题?
-
- 6.1. 如何实现延迟消息?
- 6.2. 如何实现分布式事务消息?
- 6.3. 顺序消息怎么实现?
- 6.3.1. Producer顺序发送
- 6.3.2. Consumer顺序接收
- 6.4. 如何实现消息的过滤?
1. MQ基础
1.1. 使用MQ会带来哪些问题?
- 系统可用性降低:MQ挂了怎么办?
- 系统复杂性提高:重复消费怎么办?消息丢失怎么办?怎么保证消息传递的顺序性?
- 一致性问题:发MQ消息,也写库,无法用事务保证一致性
1.2. MQ选型
- ActiveMQ比较古老了,不推荐使用
- RabbitMQ常用于开源软件,单机并发量不高。而且基于erlang开发,不适合二次开发
- RocketMQ和kafka都可以达到10w级别的吞吐量,且都是基于java开发,方便进行二次开发
1.3. RocketMQ的优点有哪些?
- 10w并发量
- 支持10亿级别的消息堆积
- 支持中从架构,支持选主(可用性)
- 消息可靠性高,经过参数配置,可以做到0丢失(一些持久化机制等)
1.4. 🌟什么场景下使用MQ?为什么要用MQ?
-
解耦:系统A要向系统B,C,D发送消息。但是这中间可能某段时间D不再需要接受消息了,或者又要又要新加一个消息接收方。这时候可以把A的消息先丢到MQ里面做解耦。
-
异步:系统A接受到请求后要花3ms写库,同时需要通知B,C,D系统写库,需要1s,但是不需要B,C,D的执行结果。替换为MQ只需要3ms就可以返回而不是1s
-
削峰:系统A的请求数量不稳定,在白天是1000QPS,到晚上是10QPS,那么可以加一个MQ,但是注意白天会有消息挤压问题。
2. RocketMQ 架构
2.1. 讲一讲rocketmq中的架构?消息消费模型?
2.1.1. producer
producer是生产者端,存在形式是生产者组
RocketMQ提供了三种发送消息的方式:
- 同步发送,指每发送一批消息会在Broker响应ACK(阻塞等待)之后才发送下一批消息。<1> 可靠性要求高 <2> 数据量级较少 <3> 实时响应
- 异步发送,不需要Broker端发送确认,需要写回调函数OnSuccess和OnException,不阻塞。<1> 数据并发量大,可靠性没那么高
- 直接发送:Broker不需要发送ACK,无返回值 <1> 发送方无法感知发出去的消息的状态
2.1.2. consumer
Consumer以消费者组的形式存在
Consumer的消费方式有两种:
- pull模式:Consumer需要主动拉取消息
- push模式:Broker通过与Consumer建立长连接的方式主动推送消息
2.1.3. broker
- 相当于一个节点
- 在一台Broker中会有多个Topic。Topic是发布和订阅的基础单位,对于相同类别的消息,应该被发送到同一个topic中。
- 在Topic中有tag概念,tag是一个二级的标签,在生产和消费的时候可以指定tag
- 对于一个Topic,其中包含多个queue,生产者和消费者会根据queue选择算法来选择生产和消费的queue
2.1.4. Nameserver
- 不同于kafka通过zk作为注册中心,RocketMQ自己实现了注册中心Nameserver
- producer生产时先从Nameserver获取broker的地址,consumer在消费的时候也会先从Nameserver获取Broker地址
- 新增的Broker会发送注册信息到Nameserver注册,并使用心跳机制保持与Nameserver的连接
- 各个Nameserver是无状态的
2.2. RocketMQ为什么不用zookeper做注册中心?谈谈你的理解?
基于性能的考虑,NameServer本身的实现非常轻量,而且可以增加机器的方式水平扩展
相比而言Zookeper的实现太重,根据ZAB协议每一个写请求都要保持写一个事务日志,再加上定期要把内存数据镜像到磁盘。
3. RocketMQ 生产和消费
3.1. 消费模式有哪些?对于不同的消费模式如何保存消费进度?
消费模式分为集群消费和广播消费
集群消费下同一个消费组可以对同一Topic下的消息只能消费一次,广播消费可以被消费者组消费多次(每个消费者都消费一次)
对于集群模式,其消费进度offset是保存在Broker端的,对于广播模式,其消费进度offset是保存在consumer端的内存中的
3.2. 什么是exactly once?rocketmq如何保证exactly once?
要满足at least once,rocketmq使用一个消费记录进度器(offset),consumer消费成功之后会提交消息的offset,并保存在消费记录进度器中。消息消费失败也会进行重试机制。
为了保证不重复发送,可能还需要额外的处理比如message id。
3.3. 什么是消息重试
RocketMQ(5)---RocketMQ重试机制 - 雨点的名字 - 博客园 (cnblogs.com)
分为Producer重试和Consumer重试,Consumer重试分异常重试和超时重试,超时重试是指的返回消费状态超时
1、默认重试次数:Product默认是2次,而Consumer默认是16次。
2、重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。
3、Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。
3.4. 什么是死信队列?
- 如果消息消费失败,则会进行重试机制,默认最多重试次数16次,且每一次的延迟都会比前一次高
- 如果超过16次仍消费失败,那么就会将消息加入死信队列
- 死信队列本质是一个特殊的topic,其topic的name为%RETRY%consumerGroup@consumerGroup
- 一旦被加入到死信队列,就意味着消费的代码出现了问题。<1>不会再正常消费,需要手动处理。<2>默认保存时间三天,三天后删除
3.5. 生产和消费的queue负载均衡算法了解吗?
3.5.1. producer
- 轮询算法:保证每个queue均匀地获取消息,有随机、hash取余。缺点:某个queue投递延迟严重会发生堆积
- 最小投递延迟:统计最小投递延迟消息时间,发送给延迟最小的queue
3.5.2. consumer
分为消息粒度和队列粒度
- 🌟环形分配策略:将queue看成环,将queue依次分给consumer(q1分配给c1, q2分配给c2, …)
- 🌟一致性hash策略:将queue的hash值放在换上,consumer的hash值也放在环上,顺时针找到的第一个queue
- 平均分配策略:n个queue,m个consumer,每个consumer分为n/m个queue
- 同机房策略
3.6. RocketMQ的长轮询了解吗?
所谓的长轮询,就是Consumer 拉取消息,如果对应的 Queue 如果没有数据,Broker 不会立即返回,而是把 PullReuqest hold起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。
4. 存储
4.1. rocketmq的存储结构讲一讲?
4.1.1. commitLog
- 当写到broker后,数据文件先写到commitLog,是顺序写的(写的速度快,类比LSM Tree)
- commitLog的大小为1GB,文件名为起始消息的偏移量,超过1GB会写到下一个commitLog
- commitLog记录了消息的内容,所属的topic,所属的queue等信息
4.1.2. consumequeue
- consumequeue文件可以加速对commitLog的查询
- commitLog记录了对应的记录在commitLog中的offset,消息的len和消息的hashCode
4.1.3. indexFile
- indexFile提供了按key和时间戳查找数据文件的能力
- 每个新来的key先hash后再mod slot_num分配到对应的slot
4.2. rocketmq持久化机制讲一讲?
rocketmq分为同步刷盘和异步刷盘,两者都要先写入Linux的page cache。不同点在于rocketmq的刷盘时机不一样,不一样的在于同步刷盘要求page cache刷入磁盘才返回ACK,异步刷盘写入page cache就返回ACK。所以异步刷盘可能有少量的数据丢失。
4.3. rocketmq为了提升读取采用了哪些手段?
- 为了改进随机读,对于数据量小的consumequeue充分利用page cache的预读取机制,性能高近乎内存
- 使用mmap对rocketMQ进行内存映射,直接独写page cache的内存,减少一次内核态到用户态的内存拷贝
5. 线上问题
5.1. 如何解决重复消费问题?
需要consumer端来手动保证,一般有两种方法
- 消息幂等:保证重复消费消息的结果是一样的,如redis的set操作
- 消息去重:需要保证每条消息都有一个唯一id,然后用一个流水表,如果流水表中有了则不能再消费了。可以用redis加速查找。
5.2. 如何保证消息不丢/可用性/可靠性?
消息在producer,broker和consumer都有可能丢失,因此分三方面来考虑:
- producer:不论是同步发送还是异步发送,都需要检查发送是否异常,如果是同步发送则检查返回值,如果是异步发送则在回调方法中检查(oneway方法就别想了)
- broker:<1>从持久化策略上:同步刷盘可以保证不丢,异步刷盘可能会有一部分丢失 <2>RocketMQ支持集群主从模式,master挂了slave依然可以变成master
- consumer:consumer端保证消息不丢是执行完业务逻辑后才返回ACK
5.3. 如何解决消息挤压问题?
- 消费者数量不足:增加消费者数量
- 消费速度提升:引入线程池,本地消息存储后即返回成功,后续慢慢消费
- 降低生产者的生产速度
- 调整Rocket的配置参数:比如消息拉取间隔,消费模式
- 增加Topic队列数:增加queue
6. 应用
6.1. 如何实现延迟消息?
-
用法:RocketMQ在生产时支持指定延迟等级,RocketMQ支持的延迟等级是有限的,但是可以通过配置文件来修改
-
实现:<1> 有延迟的消息先写到内存中,等到时间到了再写到commitLog中 <2> 在往内存(queue)中写的时候算出投递时间(创建时间 + 延迟时间)。
6.2. 如何实现分布式事务消息?
- 任务补偿机制:本地消息表,本地业务和本地消息表的写入作为一个事务。MQ发送失败则通过任务补偿轮询重发。
- 事务消息机制:先发送半消息,prepare状态,此时不可消费,再根据本地事务执行结果判断该消息是commit还是rollback。如果半消息发送失败了则业务也不会被执行,如果prepare状态半消息长时间没有收到commit还是rollback则会更新为unknown状态并主动询问。
6.3. 顺序消息怎么实现?
6.3.1. Producer顺序发送
RocketMQ只能保证同一队列的有序性
send的时候传入重写的MessageQueueSelector,保证需要发送的消息在同一队列
6.3.2. Consumer顺序接收
使用MessageListenerOrderly模式接收消息,保证同一队列的有序消息给同一个消费者。需要给Broker的queue上锁。为了防止消费者被同一线程消费,则消费者本地的queue也要加锁。
6.4. 如何实现消息的过滤?
只能在consumer过滤
-
Tag标签过滤:<1>单Tag匹配 <2>多Tag匹配 Tag1 || Tag2 <3>全部匹配 *
-
SQL属性过滤:发送方需要需要加属性
7. 高可用
主要体现在集群和主从上
7.1. 主从
Broker集群支持两种角色Master和Slave,Master支持读和写,Slave只支持读。当Master宕机时,Rocket会通过Raft算法选举出新的Slave
7.2. 集群
RocketMQ可以对一个Topic配置多个Broker,这样当一个Broker宕机,消息仍然可以打到其他Broker