第三篇:SpringBoot整合RocketMQ实战+原理解析

技术人必备!关注我的公众号 「老王的技术时光」

每天分享:最新技术干货 | 实战经验 | 行业趋势 | 个人成长 助你提升:编程技巧 | 架构设计 | 面试心得 | 学习方法

立即关注,订阅获取每周更多精彩内容,持续干货,等你来拿!

第一篇:开启RocketMQ之旅~~


一、快速实战:使用Spring Boot与RocketMQ集成

在本节中,我们将通过Spring Boot快速创建一个RocketMQ客户端。以下是如何配置Maven工程并引入必要的依赖。

1. 引入依赖

本示例使用的是 Spring Boot 3.0.4,RocketMQ的Spring Boot Starter版本为 2.3.1。为了确保客户端与服务端版本兼容,我们排除了默认的RocketMQ客户端依赖,并手动引入与Broker相同版本的客户端依赖。请注意,官方并未明确指定Spring Boot与RocketMQ整合的特定版本兼容性,因此在选择版本时需要谨慎。

注意:Spring Boot 3.0.4版本要求JDK版本为17或以上。

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.3.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>5.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>3.0.4</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <version>3.0.4</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

2. 启动类

创建一个简单的 Spring Boot 启动类:

@SpringBootApplication
public class RocketMQSBApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQSBApplication.class, args);
    }
}

3. 配置文件

application.properties 中配置 RocketMQ:

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=springBootGroup
rocketmq.consumer.group=testGroup
server.port=9000

4. 生产者实现

声明生产者,直接使用RocketMQTemplate进行消息发送。

package com.roy.rocketmq.basic;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String topic, String msg) {
        this.rocketMQTemplate.convertAndSend(topic, msg);
    }
}

5. 消费者实现

消费者的声明也很简单。所有属性通过@RocketMQMessageListener注解声明。

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

注意:Spring Boot中对消息的封装与RocketMQ原生API不同,需适应这一封装方式。


二、如何处理不同消息类型

1. 基础消息发送

基础的消息发送机制可以参考
com.roy.rocketmq.SpringRocketTest
单元测试类。

2. 多个Topic消息发送

RocketMQTemplate 实例仅支持发送单一Topic的消息。如果你需要发送至多个Topic,可以通过 @
ExtRocketMQTemplateConfiguration()
注解声明不同的 RocketMQTemplate 实例。

3. 事务消息机制

对于事务消息,我们需要使用 @
RocketMQTransactionListener
注解将事务监听器注入到Spring容器中,在这个注解当中可以通过rocketMQTemplateBeanName属性,指向具体的RocketMQTemplate子类。


三、实现原理

1. RocketMQTemplate

RocketMQTemplate 是 RocketMQ 与 Spring Boot 集成的核心组件,注入过程可以参考
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
类。不同的集成版本可能不太相同,但大致基本不变。

@Configuration
@EnableConfigurationProperties({RocketMQProperties.class})
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server"},matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
}

2. Push 模式消费者

Push 模式消费者通过 @RocketMQMessageListener 注解进行配置。大多数的应用都是基于推模式进行消费消息的,我们详细的过以下细节。

具体的处理逻辑实现位于
rocketmq-spring-boot-2.3.1.jar
中的
ListenerContainerConfiguration
类。此类将
RocketMQMessageListenerContainerRegistrar
注册到 Spring 容器中。

@Configuration
@ConditionalOnMissingBean(RocketMQMessageListenerContainerRegistrar.class)
public class ListenerContainerConfiguration {
    @Bean
    public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
        return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties);
    }
}

注入后,
rocketmq-spring-boot-2.3.1.jar
中会另外注入一个
RocketMQMessageListenerBeanPostProcessor
对象。这个对象继承了SmartLifecycle接口,因此会在初始化完成后,调用它的start方法。 注解监听注册机制详细可以看下这篇 聊聊RocketMQMessageListener的实现机制-CSDN博客



在这里会调用
RocketMQMessageListenerContainerRegistrar
startContainer方法。 在这个方法中,会启动一个
DefaultRocketMQListenerContainer。



这个
DefaultRocketMQListenerContainer
实际上就是将RocketMQ的DefaultMQPushConsumer原生API封装到这个容器中。 start方法实际上就是在启动一个RocketMQ的原生Consumer。 可以看到在
DefaultRocketMQListenerContainer
实现了InitializingBean接口,则可以通过afterPropertiesSet方法初始化前进行初始化。initRocketMQPushConsuer方法,就是在创建原生Consuer实例。



仔细分析这个方法就是用来初始化RocketMQ消费者的,它会创建一个RocketMQ原生的DefaultMQPushConsumer消费者,方法很长,抽取出比较关注的重点源码。

private void initRocketMQPushConsumer() throws MQClientException {
       .....
        //检查并创建consumer对象。
        if (Objects.nonNull(rpcHook)) {
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                enableMsgTrace, this.applicationContext.getEnvironment().
                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        }
        // 定制instanceName,有没有很熟悉!!!
        consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
        .....
        //设定广播消费还是集群消费。
        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }
        //维护消费者的其他属性。   
        ...
           //指定Consumer的消费监听 --》在消费监听中就会去调用onMessage方法。
           switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }
    }

3. Pull 模式消费者

实际开发中,拉模式用得比较少。但是,其实RocketMQ针对拉模式也做了非常多的优化。原本提供了一个DefaultMQPullConsumer类,进行拉模式消息消费,DefaultLitePullConsumer在此基础上做了很多优化。有兴趣可以自己研究一下。

Pull 模式的实现通过 DefaultLitePullConsumer 实现。该消费者通过 RocketMQTemplate 实例注入,使用 receive 方法主动拉取消息。此方式适用于特定的拉取需求,并且在高吞吐量场景下有一定优化。

初始化DefaultLitePullConsumer的代码依然是在
rocketmq-spring-boot-2.3.1.jar
包中。不过处理类是
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

这个配置类会配置在jar包中的spring.factories文件中,通过SpringBoot的自动装载机制加载进来。

 @Bean(CONSUMER_BEAN_NAME)
    @ConditionalOnMissingBean(DefaultLitePullConsumer.class)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"}) //解析的springboot配置属性。
    public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
            throws MQClientException {
        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
        
        ...
        //创建消费者   
        DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
        litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
        litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
        litePullConsumer.setNamespace(consumerConfig.getNamespace());
        return litePullConsumer;
    }

别错过更多精彩内容!

本文仅仅介绍了基础知识,更多技术深度分享敬请关注我的公众号【老王的技术时光】!

每天更新技术干货,涵盖编程技巧、架构设计、面试心得等内容,帮助你快速成长! 现在就关注公众号,获取更多学习资源,提升自己!

第一篇:开启RocketMQ之旅~~

第二篇:RocketMQ应用实战~~

原文链接:,转发请注明来源!