RocketMQ版本
- version: 5.1.0
RocketMQ中consumer消费模型
在了解RocketMQ的Rebalance机制之前,我们必须先简单了解下rocketmq的消费模型
我们知道在我们创建topic的时候需要指定一个参数就是读队列数
这里假设我们的topic是xiaozoujishu-topic,我们的读队列数 是4个,我们同一gid下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢 首先需要明确的是:
- 这里我们的消费模式是集群消费
- queue的负载均衡算法是使用默认的AllocateMessageQueueAveragely(平均分配) 假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:
四个队列分配给一个消费者
此时如果我们再启动一个消费者,那么这时候就会进行Rebalance,然后此时我们的队列分配就变成如下:
所以通过上面的队列分配我就知道Rebalance是个啥了,我们下面对Rebalance进行一些定义
RocketMQ的Rebalance是什么
Rebalance(重新平衡)机制指的是:将一个Topic下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配
Rebalance的目的
从上面可以看出Rebalance的本意是把一个topic的queue分配给合适的consumer,本意其实是为了提升消息的并行处理能力
但是Rebalance也带来了一些危害,后面我们会重点分析下
Rebalance的触发原因
我们这里先说结论
- 订阅Topic的队列数量变化
- 消费者组信息变化
这里是最深层的原因,就是topic的队列数量、消费组信息 实际我们可以将这些归结为Rebalance的元数据,这些元数据的变更,就会引起clinet的Rebalance
注意RocketMQ的Rebalance是发生在client
这些元数据都在管broker管理 核心就是这三个类
- TopicConfigManager
- SubscriptionGroupManager
- ConsumerManager
只要这个三个类的信息有变化,client就会进行Rebalance。 下面我们可以具体说下什么情况下会让这三个类变化
订阅Topic的队列数量变化
什么情况下订阅Topic的队列数量会变化呢?
- broker扩容
- broker缩容
- broker宕机(本质也是类似缩容)
消费者组信息变化
什么时候消费者组信息会变化呢?
核心就是consumer的上下线,具体细分又可以分为如下原因:
- 服务日常滚动升级
- 服务扩容
- 服务订阅消息发生变化
源码分析
上面大致介绍了Rebalance的触发原因,现在我们结合源码来具体分析下
我们就从consumer的启动开始分析吧
这里我们以最简单的demo为例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
//return ConsumeConcurrentlyStatus.RECONSUME_LATER;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
这里我们直接注意到 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 这个方法,看名字就知道是client向所有的broker发送心跳
我们进入到sendHeartbeatToAllBrokerWithLock方法看看
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
}
if (this.brokerAddrTable.isEmpty()) {
return;
}
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
String brokerName = brokerClusterInfo.getKey();
HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
if (oneTable == null) {
continue;
}
for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
Long id = singleBrokerInstance.getKey();
String addr = singleBrokerInstance.getValue();
if (addr == null) {
continue;
}
if (consumerEmpty && MixAll.MASTER_ID != id) {
continue;
}
try {
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr, e);
}
}
}
}
这段代码主要是通过this.brokerAddrTable.entrySet()获取到所有的master broker地址,然后进行心跳发送
具体的心跳发送代码实际是在下面代码中进行的
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到broker,请求码是RequestCode.HEART_BEAT
我们看看RequestCode.HEART_BEAT的调用找到`broker的处理逻辑
很快我们通过方法名就能定位到处理client的请求的方法是ClientManageProcessor类的processRequest
我们具体进去看看这个方法
可以看到具体的逻辑被封装在return this.heartBeat(ctx, request);这个方法中,所以我们需要再进去看看
进去这个方法我们能看到一个比较核心的方法 registerConsumer
很明显这个方法就是注册consumer的方法
这个方法里面和Rebalance相关比较核心的方法就是这三个
- consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
这里我们可以看看clientChannelInfo里面是个啥玩意
具体深入到updateChannel方法里面就是判断是否为新的client,是就更新channelInfoTable
- updateSubscription
这个方法就是判断订阅关系是否发生了变化并更新订阅关系
- callConsumerIdsChangeListener
- callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); 这个方法就是通知client进行Rebalance,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个CHANGE事件
这里我们可以简单看看事件定义的类型有哪些
我们直接看看具体的事件处理类
可以看到实现类有多个,我们直接看broker模块的DefaultConsumerIdsChangeListener类即可
可以看到这里是给该group所有的client发送Rebalance消息
具体的消息状态码是 RequestCode.NOTIFY_CONSUMER_IDS_CHANGED
client Rebalance
通过上面我们大致找到了整个通信过程,但是实际的Rebalance是发生在client,所以我们还是需要继续回到client的代码
我们通过状态码RequestCode.NOTIFY_CONSUMER_IDS_CHANGED 找到client的处理类ClientRemotingProcessor
实际处理方法就是
this.mqClientFactory.rebalanceImmediately();
我们进入这个方法看看这里最终就是唤醒阻塞的Rebalance线程
所以实际的方法调用还是在RebalanceService的 run方法
最终还是调用的是MQConsumerInner接口中的doRebalance方法
这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?
原来是cleint也会定时去Rebalance 默认是20s一次,可以配置
可以通过参数rocketmq.client.rebalance.waitInterval去配置
那么为什么client还要自己去循环Rebalance
原来这里是防止因为网络等其他原因丢失了broker的请求,后续网络回复了,也能进行进行Rebalance
下面我们继续看看Rebalance的实现细节
这里我们以常用的DefaultMQPushConsumerImpl为例
实际这里最终调用的还是抽象类RebalanceImpl的doRebalance方法
可以看到这里的Rebalance是按照topic的维度
我们先理解订阅单个topic的原理
这里的就是先对topic的queue排序,然后对consumer排序, 然后调用AllocateMessageQueueStrategy的allocate方法 这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析
这里的分配方式就是我们前面画图的,比如4个queue,2个consumer,那么就是每个consumer2个queue。
简单举例就是我们的 queue有q1、q2、q3、q4
consumer有 c1、c2
那么就是 c1:q1、q2 c2:q2、q3
需要注意的是如果consumer大于queue数量,多出的consumer就不会被分配到queue
client什么时候触发Rebalance
上面分析了这么多原理,这里我们总结下client什么时候会触发Rebalance
- consumer启动时会向所有master broker发送心跳,然后broker发送信息通知所有consumer触发Rebalance
- 启动完成后consumer会周期的触发Rebalance,防止因为网络等问题丢失broker的通知而没有Rebalance
- 当consumer停止时,也会通过之前分析的事件机制,触发注销comsuer事件然后通知所有的comsuer触发Rebalance
总结
这里我们详细介绍了client是如何触发Rebalance的,以及触发Rebalance的时机,也介绍了Rebalance的好处。 实际还有很多细节我们限于篇幅暂未分析。 后面我们会继续分析Rebalance的坏处和一些详细的Rebalance算法
作者:weihubeats
链接:
https://juejin.cn/post/7234078632653488184
来源:稀土掘金