技术人必备!关注我的公众号 「老王的技术时光」
每天分享:最新技术干货 | 实战经验 | 行业趋势 | 个人成长 助你提升:编程技巧 | 架构设计 | 面试心得 | 学习方法
立即关注,订阅获取每周更多精彩内容,持续干货,等你来拿!
一、快速实战:使用Spring Boot与RocketMQ集成
在本节中,我们将通过Spring Boot快速创建一个RocketMQ客户端。以下是如何配置Maven工程并引入必要的依赖。
1. 引入依赖
本示例使用的是 Spring Boot 3.0.4,RocketMQ的Spring Boot Starter版本为 2.3.1。为了确保客户端与服务端版本兼容,我们排除了默认的RocketMQ客户端依赖,并手动引入与Broker相同版本的客户端依赖。请注意,官方并未明确指定Spring Boot与RocketMQ整合的特定版本兼容性,因此在选择版本时需要谨慎。
注意:Spring Boot 3.0.4版本要求JDK版本为17或以上。
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
2. 启动类
创建一个简单的 Spring Boot 启动类:
@SpringBootApplication
public class RocketMQSBApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQSBApplication.class, args);
}
}
3. 配置文件
在 application.properties 中配置 RocketMQ:
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=springBootGroup
rocketmq.consumer.group=testGroup
server.port=9000
4. 生产者实现
声明生产者,直接使用RocketMQTemplate进行消息发送。
package com.roy.rocketmq.basic;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) {
this.rocketMQTemplate.convertAndSend(topic, msg);
}
}
5. 消费者实现
消费者的声明也很简单。所有属性通过@RocketMQMessageListener注解声明。
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
}
}
注意:Spring Boot中对消息的封装与RocketMQ原生API不同,需适应这一封装方式。
二、如何处理不同消息类型
1. 基础消息发送
基础的消息发送机制可以参考
com.roy.rocketmq.SpringRocketTest 单元测试类。
2. 多个Topic消息发送
RocketMQTemplate 实例仅支持发送单一Topic的消息。如果你需要发送至多个Topic,可以通过 @
ExtRocketMQTemplateConfiguration() 注解声明不同的 RocketMQTemplate 实例。
3. 事务消息机制
对于事务消息,我们需要使用 @
RocketMQTransactionListener 注解将事务监听器注入到Spring容器中,在这个注解当中可以通过rocketMQTemplateBeanName属性,指向具体的RocketMQTemplate子类。
三、实现原理
1. RocketMQTemplate
RocketMQTemplate 是 RocketMQ 与 Spring Boot 集成的核心组件,注入过程可以参考
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration 类。不同的集成版本可能不太相同,但大致基本不变。
@Configuration
@EnableConfigurationProperties({RocketMQProperties.class})
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server"},matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
}
2. Push 模式消费者
Push 模式消费者通过 @RocketMQMessageListener 注解进行配置。大多数的应用都是基于推模式进行消费消息的,我们详细的过以下细节。
具体的处理逻辑实现位于
rocketmq-spring-boot-2.3.1.jar 中的
ListenerContainerConfiguration 类。此类将
RocketMQMessageListenerContainerRegistrar 注册到 Spring 容器中。
@Configuration
@ConditionalOnMissingBean(RocketMQMessageListenerContainerRegistrar.class)
public class ListenerContainerConfiguration {
@Bean
public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties);
}
}
注入后,
rocketmq-spring-boot-2.3.1.jar中会另外注入一个
RocketMQMessageListenerBeanPostProcessor对象。这个对象继承了SmartLifecycle接口,因此会在初始化完成后,调用它的start方法。 注解监听注册机制详细可以看下这篇 聊聊RocketMQMessageListener的实现机制-CSDN博客
在这里会调用
RocketMQMessageListenerContainerRegistrar的startContainer方法。 在这个方法中,会启动一个
DefaultRocketMQListenerContainer。
这个
DefaultRocketMQListenerContainer实际上就是将RocketMQ的DefaultMQPushConsumer原生API封装到这个容器中。 start方法实际上就是在启动一个RocketMQ的原生Consumer。 可以看到在
DefaultRocketMQListenerContainer实现了InitializingBean接口,则可以通过afterPropertiesSet方法初始化前进行初始化。initRocketMQPushConsuer方法,就是在创建原生Consuer实例。
仔细分析这个方法就是用来初始化RocketMQ消费者的,它会创建一个RocketMQ原生的DefaultMQPushConsumer消费者,方法很长,抽取出比较关注的重点源码。
private void initRocketMQPushConsumer() throws MQClientException {
.....
//检查并创建consumer对象。
if (Objects.nonNull(rpcHook)) {
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
// 定制instanceName,有没有很熟悉!!!
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
.....
//设定广播消费还是集群消费。
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
//维护消费者的其他属性。
...
//指定Consumer的消费监听 --》在消费监听中就会去调用onMessage方法。
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
}
3. Pull 模式消费者
实际开发中,拉模式用得比较少。但是,其实RocketMQ针对拉模式也做了非常多的优化。原本提供了一个DefaultMQPullConsumer类,进行拉模式消息消费,DefaultLitePullConsumer在此基础上做了很多优化。有兴趣可以自己研究一下。
Pull 模式的实现通过 DefaultLitePullConsumer 实现。该消费者通过 RocketMQTemplate 实例注入,使用 receive 方法主动拉取消息。此方式适用于特定的拉取需求,并且在高吞吐量场景下有一定优化。
初始化DefaultLitePullConsumer的代码依然是在
rocketmq-spring-boot-2.3.1.jar包中。不过处理类是
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration。
这个配置类会配置在jar包中的spring.factories文件中,通过SpringBoot的自动装载机制加载进来。
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"}) //解析的springboot配置属性。
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
...
//创建消费者
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
return litePullConsumer;
}
别错过更多精彩内容!
本文仅仅介绍了基础知识,更多技术深度分享敬请关注我的公众号【老王的技术时光】!
每天更新技术干货,涵盖编程技巧、架构设计、面试心得等内容,帮助你快速成长! 现在就关注公众号,获取更多学习资源,提升自己!