准备环境

  • JDK 17
  • Spring Boot 3.2.3
  • RocketMQ(服务端) 5.3.1
  • rocketmq-v5-client-spring-boot-starter(客户端) 2.3.1

在 SpringBoot 项目中依赖如下配置:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.1</version>
</dependency>

如果还未搭建服务端,可以先看第5节-服务器环境搭建

参数配置

按照 SpringBoot 的约定习俗,在上手一个新的 spring-boot-starter项目时,想要知道怎么使用它,看它的 AutoConfiguration 就对了。

rocketmq-v5-client-spring-boot中,对应的 AutoConfiguration 类为 RocketMQAutoConfiguration,其类定义部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtTemplateResetConfiguration.class,
ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
// ... 省略

@Bean(PRODUCER_BUILDER_BEAN_NAME)
@ConditionalOnMissingBean(ProducerBuilderImpl.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"producer.endpoints"})
public ProducerBuilder producerBuilder(RocketMQProperties rocketMQProperties) {
// ... 省略
}

@Bean(SIMPLE_CONSUMER_BUILDER_BEAN_NAME)
@ConditionalOnMissingBean(SimpleConsumerBuilder.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"simple-consumer.endpoints"})
public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQProperties) {
// ... 省略
}

@Bean(destroyMethod = "destroy")
@Conditional(ProducerOrConsumerPropertyCondition.class)
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
// ... 省略
}
}

可以发现,在rocketmq-v5-client-spring-boot中,根据 RocketMQ 5.x 在架构上做的改进,使用了 endpoints 来替代传统的 namesrvAddr,以支持更灵活的网络拓扑和云原生架构。endpoints 通常指向 RocketMQ 的 Broker 或 Nameserver 地址,用于生产者与 RocketMQ 集群建立连接。endpoints 是一个 URL 或 IP 地址(ip:host)列表(使用;分割)。

⚠️注意:在 RocketMQ 5.x 中,现已默认使用gRPC作为通信协议,entpoints更建议指向 Proxy 地址,一般默认端口为8081。

因此,现在想要启用默认的生产者(ProducerBuilder),只需要配置rocketmq.producer.endpoints即可。

想要启用默认的消费者(SimpleConsumerBuilder),只需要配置rocketmq.simple-consumer.endpoints即可。

RocketMQClientTemplate则是通过判断当前应用上下文是否含有ProducerBuilderSimpleConsumerBuilder Bean对象生成而来。它属于rocketmq-v5-client-spring-boot模块下,也就是说它利用了Spring特性,提供了Spring风格的API,方便开发者通过 Spring 的编程模型来进行消息发送和接收。

既然是原生态的简易使用教程,那么就尽可能在不写多的代码的情况下,实现生产环境中使用MQ。

因此,本次项目就只配置 rocketmq.producer.endpoints 用于启用默认的生产者,消费者使用Push消费模式,所以配置rocketmq.push-consumer.endpoints。配置如下:

1
2
3
4
5
rocketmq:
producer:
endpoints: localhost:8081
push-consumer:
endpoints: localhost:8081

topic在代码中指定,不使用rocketmq.producer.topicrocketmq.push-consumer.topic配置默认的topic。

tips: 在启动客户端服务时,topic需要先创建,否则会启动报错。

生产者生产消息

生产消息通过SpringBoot自动装配的RocketMQClientTemplate对象实现,发送Message对象,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
@Service
public class MyService {
@Autowired
private RocketMQClientTemplate rocketMQClientTemplate;

public void sendMessage() {
byte[] bytes = "这是一个字符串".getBytes(StandardCharsets.UTF_8);
Message<byte[]> message = MessageBuilder.withPayload(bytes).build();
rocketMQClientTemplate.send("MyTopic", message);
}
}

⚠️注意:在 RocketMQ 5.x 中,Message对象已从自定义对象改为spring-messaging包中的Message对象。一般通过MessageBuilder构建,实例对象类型为GenericMessage

消费者消费消息

消费者通过@RocketMQMessageListener注解,并实现RocketMQListener接口消费消息,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
@RocketMQMessageListener(consumerGroup = "MyTopic-service", topic = "MyTopic", tag = "*")
public class MyService implements RocketMQListener {

@Override
public ConsumeResult consume(MessageView messageView) {
// 从 MessageView 中获取 ByteBuffer
ByteBuffer byteBuffer = messageView.getBody();

// 转换 ByteBuffer 为字节数组
byte[] body = new byte[byteBuffer.remaining()];
byteBuffer.get(body);

// 处理字节数组,例如转换为字符串
String messageBody = new String(body, StandardCharsets.UTF_8);

System.out.println("消费消息内容:" + messageBody);

return ConsumeResult.SUCCESS;
}
}

服务端环境搭建

  1. 下载二进制包

    Apache RocketMQ 本地部署 RocketMQ 文档中,可以找到最新的二进制包,位置如下:

    image-20241014163749567

    如果想保持跟本文相同版本,可以直接点击链接下载RocketMQ 5.3.1版本。

  2. 启动NameServer

    1
    2
    3
    4
    5
    6
    #### 启动namesrv
    $ nohup sh bin/mqnamesrv &

    #### 验证namesrv是否启动成功
    $ tail -f ~/logs/rocketmqlogs/namesrv.log
    The Name Server boot success...
  3. 本地模式启动Broker+Proxy

    1
    2
    3
    4
    5
    6
    #### 先启动broker
    $ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

    #### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
    $ tail -f ~/logs/rocketmqlogs/proxy.log
    The broker[broker-a,192.169.1.2:10911] boot success...

    mqbroker脚本默认会读取 conf/broker.conf 配置用于Broker服务。在 conf/rmq-proxy.json 中是Proxy服务的配置,通过 --enable-proxy 命令启动时,需要加上 -pc conf/rmq-proxy.json 参数指定配置文件位置。

    broker.conf的监听端口key为listenPort,管理端口key为brokerAdminPort

    rmq.proxy.json的gRPC请求端口key为grpcServerPort,传统的消息发送和接收请求的端口key为remotingListenPort

  4. 关闭服务

    停止Broker:sh bin/mqshutdown broker

    停止NameServer:sh bin/mqshutdown namesrv

关于RocketMQ的管理命令可以参考Admin Tool