同学习RocketMQ-生产者之消息发送一样,为了更快速上手,所以直接从rocketmq-spring-boot-samples示例开始。

⚠️注意:rocketmq-spring-boot-starter 适用于 RocketMQ 4.x 客户端,主要面向依赖于 RocketMQ 4.x 的应用。

rocketmq-consume-demo

进入到 rocketmq-spring-boot-samples 项目中,会发现如下模块:

image-20231101103949080

与生产者一样,消费者的demo也分为了普通demo和acl-demo两种,recketmq-consume-demo的项目结构如下:

image-20231108173034800

ConsumerApplication是一个SpringBootApplication,主要是注入了RocketMQTemplate对象,使用其消费消息。代码如下:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {

@Resource
private RocketMQTemplate rocketMQTemplate;

@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
//This is an example of pull consumer using rocketMQTemplate.
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);

//This is an example of pull consumer using extRocketMQTemplate.
messages = extRocketMQTemplate.receive(String.class);
System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
}
}

通过run方法可以看到rocketMQTemplate.receive就是消费者消费消息的实现方法,关键对象就是RocketMQTemplate,与生产者示例一样,它是通过spring依赖注入进来的,并且两者是同一个RocketMQTemplate对象。

RocketMQTemplate对象注册于RocketMQAutoConfiguration。具体细节可回看:RocketMQ-生产者之消息发送

消费者消费消息原理

跟踪进入到rocketMQTemplate#receive方法中。

image-20231114142146956

receive方法主要接收两个参数,class是消息对象类型,timeout表示接收消息的超时时间(默认毫秒)。消息对象类型比较好解释,就是这个消息的对象。而接收消息的超时时间是指等待消息的时间,当消息队列有消息时是不需要进行等待的,这里的等待超时时间是指当消息队列没有消息时,RocketMQ默认采用乐观消费思想,认为在等待指定时间内会有消息生产出来。

消费消息主要是consumer.poll方法实现,返回的消息对象是MessageExt,由RocketMQTemplate将其转换为指定的消息对象类型clazz。转换方法(doConvertMessage)如下:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 private <T> T doConvertMessage(MessageExt messageExt, Class<T> messageType) {
if (Objects.equals(messageType, MessageExt.class)) {
return (T) messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return (T) str;
} else {
// If msgType not string, use objectMapper change it.
try {
return (T) this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), messageType);
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
throw new RuntimeException("cannot convert message to " + messageType, e);
}
}
}
}

messageExt.getBody()返回的是消息的字节数组,转换成指定对象类型时,都是先将其转为字符串,再通过spring-messaging的MessageConverter做对象转换。

再看consumer.poll的具体实现,consumer的默认对象为DefaultLitePullConsumer

image-20231114144317897

与生产者发送消息的逻辑一样,最终消费消息的实现都是由实现类DefaultLitePullConsumerImpl实现的。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public synchronized List<MessageExt> poll(long timeout) {
try {
checkServiceState();
if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative");
}

if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
}
// 拉取消息的结束时间戳
long endTime = System.currentTimeMillis() + timeout;

// 这里利用了BlockingQueue的获取队列元素的特性
// 从队列中获取消息,如果队列消息为空,则队列等待指定时间后再返回
ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

// 当前时间戳小于结束时间戳
if (endTime - System.currentTimeMillis() > 0) {
// 如果从队列获取的消息不为空,并且消息是废弃的,则重新从队列中获取
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0) {
break;
}
}
}

// 如果从队列获取的消息不为空, 并且消息不是废弃的
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts();
// 从消息队列的msgTreeMap移除这些消息,并返回消息偏移量最靠前的元素
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
// 更新消息队列的消费偏移量
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
// If namespace not null , reset Topic without namespace.
this.resetTopic(messages);

// 消费消息的Hook
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(consumeRequest.getMessageQueue());
consumeMessageContext.setMsgList(messages);
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
// 设置消息队列最后消费的时间戳
consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
return messages;
}
} catch (InterruptedException ignore) {

}

return Collections.emptyList();
}

poll方法是synchronized方法,其目的是为了保证同一时间一个消费者只会从消息缓存队列获取一次消息,避免消息的重复消费。这里面的关键在于consumeRequestCache缓存队列何时有消息进入队列。

因此就找consumeRequestCache对象何时插入元素,整个对象中只有一处调用了consumeRequestCache.put(consumeRequest)方法。

image-20231114162527102

而调用submitConsumeRequest方法的地方只有一处,即DefaultLitePullConsumerImpl.PullTaskImpl#run方法。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ....
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
switch (pullResult.getPullStatus()) {
// 表示发现消息
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList());
// 添加到消息缓存队列
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
case OFFSET_ILLEGAL:
log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
default:
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
// ....

该方法是由定时线程池触发的,在实例化DefaultLitePullConsumerImpl.PullTaskImpl对象后,将其放入线程池定时执行run方法。(实例化该对象是由RocketMQ消费者负载均衡实现的)

image-20231114163708337

到此,消费者消费消息的原理基本就大致理清了。

小结

  1. 通过DefaultLitePullConsumer手动触发poll方法消费消息。
  2. 通过DefaultLitePullConsumerImpl对象获取消息缓存队列(consumeRequestCache)的消息。
  3. 而消息缓存队列的数据来源于DefaultLitePullConsumerImpl内置的定时线程池,通过触发DefaultLitePullConsumerImpl.PullTaskImplrun方法从RocketMQ消息队列拉取消息。

消费者核心参数

实际上,RocketMQ消费者有两类,一种是Pull模式,一种是Push模式。而rocketmq-consume-demo中演示的是Pull模式,因此这里主要介绍DefaultLitePullConsumer消费者的核心参数。后期有机会再专门介绍DefaultMQPushConsumer

DefaultLitePullConsumer核心方法

  • void start()

    启动消费者。

  • void shutdown()

    关闭消费者。

  • void subscribe(String topic, String subExpression)

    按照主题与消息过滤表达式进行订阅。

  • void subscribe(String topic, MessageSelector selector)

    按照主题与过滤表达式订阅消息,过滤表达式可通过 MessageSelector 的 bySql、byTag 来创建,这个与 PUSH 模式类似,故不重复展开。

    温馨提示:通过 subscribe 方式订阅 Topic,具备消息消费队列的重平衡,即如果消费消费者数量、主题的队列数发生变化时,各个消费者订阅的队列信息会动态变化。

  • void unsubscribe(String topic)

    取消订阅。

  • void assign(Collection< MessageQueue > messageQueues)

    收到指定该消费者消费的队列,这种消费模式不具备消息消费队列的自动重平衡。

  • List poll()

    消息拉取 API,默认超时时间为 5s。

  • List poll(long timeout)

    消息拉取 API,可指定消息拉取超时时间。

  • void seek(MessageQueue messageQueue, long offset)

    改变下一次消息拉取的偏移量,即改变 poll() 方法下一次运行的拉取消息偏移量,类似于回溯或跳过消息,注意:如果设置的 offset 大于当前消费队列的消费偏移量,就会造成部分消息直接跳过没有消费,使用时请慎重。

  • void seekToBegin(MessageQueue messageQueue)

    改变下一次消息拉取的偏移量到消息队列最小偏移量。其效果相当于重新来过一次。

  • void seekToEnd(MessageQueue messageQueue)

    该变下一次消息拉取偏移量到队列的最大偏移量,即跳过当前所有的消息,从最新的偏移量开始消费。

  • void pause(Collection< MessageQueue > messageQueues)

    暂停消费,支持将某些消息消费队列挂起,即 poll() 方法在下一次拉取消息时会暂时忽略这部分消息消费队列,可用于消费端的限流。

  • void resume(Collection< MessageQueue > messageQueues)

    恢复消费。

  • boolean isAutoCommit()

    是否自动提交消费位点,Lite Pull 模式下可设置是否自动提交位点。

  • void setAutoCommit(boolean autoCommit)

    设置是否自动提交位点。

  • Collection fetchMessageQueues(String topic)

    获取 Topic 的路由信息。

  • Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp)

    根据时间戳查找最接近该时间戳的消息偏移量。

  • void commitSync()

    手动提交消息消费位点,在集群消费模式下,调用该方法只是将消息偏移量提交到 OffsetStore 在内存中,并不是实时向 Broker 提交位点,位点的提交还是按照定时任务定时向 Broker 汇报。

  • Long committed(MessageQueue messageQueue)

    获取该消息消费队列已提交的消费位点(从 OffsetStore 中获取,即集群模式下会向 Broker 中的消息消费进度文件中获取。

  • void registerTopicMessageQueueChangeListener(String topic,TopicMessageQueueChangeListener listener)

    注册主题队列变化事件监听器,客户端会每 30s 查询一下 订阅的 Topic 的路由信息(队列信息)的变化情况,如果发生变化,会调用注册的事件监听器。

  • void updateNameServerAddress(String nameServerAddress)

    更新 NameServer 的地址。

DefaultLitePullConsumer核心属性

  • String consumerGroup

    消息消费组。

  • long brokerSuspendMaxTimeMillis = 1000 * 20

    长轮询模式,如果开启长轮询模式,当 Broker 收到客户端的消息拉取请求时如果当时并没有新的消息,可以在 Broker 端挂起当前请求,一旦新消息到达则唤醒线程,从 Broker 端拉取消息后返回给客户端,该值设置在 Broker 等待的最大超时时间,默认为 20s,建议保持默认值即可。

  • long consumerTimeoutMillisWhenSuspend = 1000 * 30

    消息消费者拉取消息最大的超时时间,该值必须大于 brokerSuspendMaxTimeMillis,默认值为 30s,同样不建议修改该值。

  • long consumerPullTimeoutMillis = 1000 * 10

    客户端与 Broker 建立网络连接的最大超时时间,默认为 10s。

  • MessageModel messageModel = MessageModel.CLUSTERING

    消息组消费模型,可选值:集群模式、广播模式。

  • MessageQueueListener messageQueueListener

    消息消费负载队列变更事件。

  • OffsetStore offsetStore

    消息进度存储管理器,该属性为私有属性,不能通过 API 进行修改,该参数主要是根据消费模式在内部自动创建,RocketMQ 在广播消息、集群消费两种模式下消息消费进度的存储策略会有所不同。

    • 集群模式:RocketMQ 会将消息消费进度存储在 Broker 服务器,存储路径为 ${ROCKET_HOME}/store/config/ consumerOffset.json 文件中。
    • 广播模式:RocketMQ 会将消息消费进存在在消费端所在的机器上,存储路径为 ${user.home}/.rocketmq_offsets 中。

    为了方便大家对消息消费进度有一个直接的理解,下面给出我本地测试时 Broker 集群中的消息消费进度文件,其截图如下:

    1

    消息消费进度,首先使用 topic@consumerGroup 为键,其值是一个 Map,键为 Topic 的队列序列,值为当前的消息消费位点。

  • AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely()

    消息队列负载算法。主要解决的问题是消息消费队列在各个消费者之间的负载均衡策略,例如一个 Topic 有8个队列,一个消费组中有3个消费者,那这三个消费者各自去消费哪些队列。

    RocketMQ 默认提供了如下负载均衡算法:

    • AllocateMessageQueueAveragely:平均连续分配算法。
    • AllocateMessageQueueAveragelyByCircle:平均轮流分配算法。
    • AllocateMachineRoomNearby:机房内优先就近分配。
    • AllocateMessageQueueByConfig:手动指定,这个通常需要配合配置中心,在消费者启动时,首先先创建 AllocateMessageQueueByConfig 对象,然后根据配置中心的配置,再根据当前的队列信息,进行分配,即该方法不具备队列的自动负载,在 Broker 端进行队列扩容时,无法自动感知,需要手动变更配置。
    • AllocateMessageQueueByMachineRoom:消费指定机房中的队列,该分配算法首先需要调用该策略的 setConsumeridcs(Set<String> consumerIdCs) 方法,用于设置需要消费的机房,将刷选出来的消息按平均连续分配算法进行队列负载。
  • boolean autoCommit = true

    设置是否提交消息消费进度,默认为 true。

  • int pullThreadNums = 20

    消息拉取线程数量,默认为 20 个,注意这个是每一个消费者默认 20 个线程往 Broker 拉取消息。这个应该是 Lite PULL 模式对比 PUSH 模式一个非常大的优势。

  • long autoCommitIntervalMillis = 5 * 1000

    自动汇报消息位点的间隔时间,默认为 5s。

  • int pullBatchSize = 10

    一次消息拉取最多返回的消息条数,默认为 10。

  • long pullThresholdForAll = 10000

    针对所有队列的消息消费请求数触发限流的阔值,默认为 10000。

  • int consumeMaxSpan = 2000

    单个消息处理队列中最大消息偏移量与最小偏移量的差值触发限流的阔值,默认为 2000。

  • int pullThresholdForQueue = 1000

    对于单个队列挤压的消息条数触发限流的阔值,默认为 1000,即如果某一个队列在本地挤压超过 1000 条消息,则停止消息拉取。

  • int pullThresholdSizeForQueue = 100

    对于单个队列挤压的消息总大小触发限流的阔值,默认为 100M。

  • long pollTimeoutMillis = 1000 * 5

    一次消息拉取默认的超时时间为 5s。

  • long topicMetadataCheckIntervalMillis = 30 * 1000

    topic 路由信息更新频率,默认 30s 更新一次。

  • ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET

    一个消费者初次启动时(即消费进度管理器中无法查询到该消费组的进度)时从哪个位置开始消费的策略,可选值如下所示:

    • CONSUME_FROM_LAST_OFFSET:从最新的消息开始消费。
    • CONSUME_FROM_FIRST_OFFSET:从最新的位点开始消费。
    • CONSUME_FROM_TIMESTAMP:从指定的时间戳开始消费,这里的实现思路是从 Broker 服务器寻找消息的存储时间小于或等于指定时间戳中最大的消息偏移量的消息,从这条消息开始消费。
  • String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30))

    指定从什么时间戳开始消费,其格式为 yyyyMMddHHmmss,默认值为 30 分钟之前,该参数只在 consumeFromWhere 为 CONSUME_FROM_TIMESTAMP 时生效。

封装消费者工具

主要是对Consumer的封装,实现自定义方法自动消费消息,类似于rocketmq-spring-boot提供的RocketMQMessageListener功能。

使用示例:

java
1
2
3
4
@MQConsumerListener
public void consumeListenerByPush(UserVo msg) {
System.out.printf("消费者监听[%s]: %s\n", Thread.currentThread().getName(), msg);
}

@MQConsumerListener注解结构如下:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MQConsumerListener {

String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String TOPIC_PLACEHOLDER = "${rocketmq.consumer.topic:}";
String GROUP_PLACEHOLDER = "${rocketmq.consumer.group:}";

/**
* 消费者名称,即Bean名称
*/
String value() default "";

/**
* 消费监听模式
*/
ConsumeListeningMode consumeListeningMode() default ConsumeListeningMode.PUSH;

/**
* 消息监听规则
* <p>仅在 ConsumeListeningMode.PUSH 模式下生效</p>
*/
MessageListeningRule messageListeningRule() default MessageListeningRule.ORDERLY;

String nameServer() default NAME_SERVER_PLACEHOLDER;

String topic() default TOPIC_PLACEHOLDER;

String group() default GROUP_PLACEHOLDER;

MessageModel messageModel() default MessageModel.CLUSTERING;

SelectorType selectorType() default SelectorType.TAG;

String selectorExpression() default "*";

}

具体源代码地址:rocketmq-consumer-demo