RocketMQ你不得不了解的 Rebalance机制源码分析

RocketMQ版本

  • version: 5.1.0

RocketMQ中consumer消费模型

在了解RocketMQ的Rebalance机制之前,我们必须先简单了解下rocketmq的消费模型

我们知道在我们创建topic的时候需要指定一个参数就是读队列数

这里假设我们的topic是xiaozoujishu-topic,我们的读队列数 是4个,我们同一gid下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢 首先需要明确的是:

  1. 这里我们的消费模式是集群消费
  2. queue的负载均衡算法是使用默认的AllocateMessageQueueAveragely(平均分配) 假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:

四个队列分配给一个消费者

此时如果我们再启动一个消费者,那么这时候就会进行Rebalance,然后此时我们的队列分配就变成如下:

所以通过上面的队列分配我就知道Rebalance是个啥了,我们下面对Rebalance进行一些定义

RocketMQ的Rebalance是什么

Rebalance(重新平衡)机制指的是:将一个Topic下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配

Rebalance的目的

从上面可以看出Rebalance的本意是把一个topic的queue分配给合适的consumer,本意其实是为了提升消息的并行处理能力

但是Rebalance也带来了一些危害,后面我们会重点分析下

Rebalance的触发原因

我们这里先说结论

  1. 订阅Topic的队列数量变化
  2. 消费者组信息变化

这里是最深层的原因,就是topic的队列数量、消费组信息 实际我们可以将这些归结为Rebalance的元数据,这些元数据的变更,就会引起clinet的Rebalance

注意RocketMQ的Rebalance是发生在client

这些元数据都在管broker管理 核心就是这三个类

  • TopicConfigManager
  • SubscriptionGroupManager
  • ConsumerManager

只要这个三个类的信息有变化,client就会进行Rebalance。 下面我们可以具体说下什么情况下会让这三个类变化

订阅Topic的队列数量变化

什么情况下订阅Topic的队列数量会变化呢?

  1. broker扩容
  2. broker缩容
  3. broker宕机(本质也是类似缩容)

消费者组信息变化

什么时候消费者组信息会变化呢?

核心就是consumer的上下线,具体细分又可以分为如下原因:

  1. 服务日常滚动升级
  2. 服务扩容
  3. 服务订阅消息发生变化

源码分析

上面大致介绍了Rebalance的触发原因,现在我们结合源码来具体分析下

我们就从consumer的启动开始分析吧

这里我们以最简单的demo为例

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
    //return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

System.out.printf("Consumer Started.%n");

这里我们直接注意到 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 这个方法,看名字就知道是client向所有的broker发送心跳

我们进入到sendHeartbeatToAllBrokerWithLock方法看看

private void sendHeartbeatToAllBroker() {
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
        return;
    }

    if (this.brokerAddrTable.isEmpty()) {
        return;
    }
    long times = this.sendHeartbeatTimesTotal.getAndIncrement();
    for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
        String brokerName = brokerClusterInfo.getKey();
        HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
        if (oneTable == null) {
            continue;
        }
        for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
            Long id = singleBrokerInstance.getKey();
            String addr = singleBrokerInstance.getValue();
            if (addr == null) {
                continue;
            }
            if (consumerEmpty && MixAll.MASTER_ID != id) {
                continue;
            }

          try {
              int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
              if (!this.brokerVersionTable.containsKey(brokerName)) {
                  this.brokerVersionTable.put(brokerName, new HashMap<>(4));
              }
             this.brokerVersionTable.get(brokerName).put(addr, version);
             if (times % 20 == 0) {
                 log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                 log.info(heartbeatData.toString());
             }
        } catch (Exception e) {
            if (this.isBrokerInNameServer(addr)) {
                log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
            } else {
                log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr, e);
            }
        }
    }
}

这段代码主要是通过this.brokerAddrTable.entrySet()获取到所有的master broker地址,然后进行心跳发送

具体的心跳发送代码实际是在下面代码中进行的

int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());

我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到broker,请求码是RequestCode.HEART_BEAT

我们看看RequestCode.HEART_BEAT的调用找到`broker的处理逻辑

很快我们通过方法名就能定位到处理client的请求的方法是ClientManageProcessor类的processRequest

我们具体进去看看这个方法

可以看到具体的逻辑被封装在return this.heartBeat(ctx, request);这个方法中,所以我们需要再进去看看

进去这个方法我们能看到一个比较核心的方法 registerConsumer

很明显这个方法就是注册consumer的方法

这个方法里面和Rebalance相关比较核心的方法就是这三个

  1. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);

这里我们可以看看clientChannelInfo里面是个啥玩意

具体深入到updateChannel方法里面就是判断是否为新的client,是就更新channelInfoTable

  1. updateSubscription

这个方法就是判断订阅关系是否发生了变化并更新订阅关系

  1. callConsumerIdsChangeListener
  • callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); 这个方法就是通知client进行Rebalance,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个CHANGE事件

这里我们可以简单看看事件定义的类型有哪些

我们直接看看具体的事件处理类

可以看到实现类有多个,我们直接看broker模块的DefaultConsumerIdsChangeListener类即可

可以看到这里是给该group所有的client发送Rebalance消息

具体的消息状态码是 RequestCode.NOTIFY_CONSUMER_IDS_CHANGED

client Rebalance

通过上面我们大致找到了整个通信过程,但是实际的Rebalance是发生在client,所以我们还是需要继续回到client的代码

我们通过状态码RequestCode.NOTIFY_CONSUMER_IDS_CHANGED 找到client的处理类ClientRemotingProcessor

实际处理方法就是

this.mqClientFactory.rebalanceImmediately();

我们进入这个方法看看这里最终就是唤醒阻塞的Rebalance线程

所以实际的方法调用还是在RebalanceService的 run方法

最终还是调用的是MQConsumerInner接口中的doRebalance方法

这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?

原来是cleint也会定时去Rebalance 默认是20s一次,可以配置

可以通过参数rocketmq.client.rebalance.waitInterval去配置

那么为什么client还要自己去循环Rebalance

原来这里是防止因为网络等其他原因丢失了broker的请求,后续网络回复了,也能进行进行Rebalance

下面我们继续看看Rebalance的实现细节

这里我们以常用的DefaultMQPushConsumerImpl为例

实际这里最终调用的还是抽象类RebalanceImpl的doRebalance方法

可以看到这里的Rebalance是按照topic的维度

我们先理解订阅单个topic的原理

这里的就是先对topic的queue排序,然后对consumer排序, 然后调用AllocateMessageQueueStrategy的allocate方法 这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析

这里的分配方式就是我们前面画图的,比如4个queue,2个consumer,那么就是每个consumer2个queue。
简单举例就是我们的 queue有q1、q2、q3、q4
consumer有 c1、c2

那么就是 c1:q1、q2 c2:q2、q3

需要注意的是如果consumer大于queue数量,多出的consumer就不会被分配到queue

client什么时候触发Rebalance

上面分析了这么多原理,这里我们总结下client什么时候会触发Rebalance

  1. consumer启动时会向所有master broker发送心跳,然后broker发送信息通知所有consumer触发Rebalance
  2. 启动完成后consumer会周期的触发Rebalance,防止因为网络等问题丢失broker的通知而没有Rebalance
  3. 当consumer停止时,也会通过之前分析的事件机制,触发注销comsuer事件然后通知所有的comsuer触发Rebalance

总结

这里我们详细介绍了client是如何触发Rebalance的,以及触发Rebalance的时机,也介绍了Rebalance的好处。 实际还有很多细节我们限于篇幅暂未分析。 后面我们会继续分析Rebalance的坏处和一些详细的Rebalance算法

作者:weihubeats
链接:
https://juejin.cn/post/7234078632653488184

来源:稀土掘金

原文链接:,转发请注明来源!