⚠️注意:rocketmq-spring-boot-starter 适用于 RocketMQ 4.x 客户端,主要面向依赖于 RocketMQ 4.x 的应用。

Pull与Push消费模式的对比

在 RocketMQ 的内部实现原理中,其实现机制为 PULL 模式,而 PUSH 模式是一种伪推送,是对 PULL 模式的封装,PullCustomer每拉去一批消息后,提交到消费端的线程池(异步),然后马上向 Broker 拉取消息,即实现类似“推”的效果。

从 PULL 模式来看,消息的消费主要包含如下几个方面:

  • 消息拉取,消息拉取模式通过 PULL 相关的 API 从 Broker 指定消息消费队列中拉取一批消息到消费消费客户端,多个消费者需要手动完成队列的分配。
  • 消息消费端处理完消费,需要向 Broker 端报告消息处理队列,然后继续拉取下一批消息。
  • 如果遇到消息消费失败,需要告知 Broker,该条消息消费失败,后续需要重试,通过手动调用 sendMessageBack 方法实现。

而 PUSH 模式就上述这些处理操作无需使用者考虑,只需告诉 RocketMQ 消费者在拉取消息后需要调用的事件监听器即可,消息消费进度的存储、消息消费的重试统一由 RocketMQ Client 来实现。

Pull消费者API

Pull消费者的实现对象为:org.apache.rocketmq.client.consumer.DefaultLitePullConsumer

启动消费者方法:DefaultLitePullConsumer#start() -> DefaultLitePullConsumerImpl#start()

消费消息的方式:手动调用 DefaultLitePullConsumer#poll()

Push消费者API

Push消费者的实现对象为:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer

核心参数(在实例化对象后必须手动赋值的参数):MessageListener messageListener

启动消费者方法:DefaultMQPushConsumer#start() -> DefaultMQPushConsumerImpl#start()

在启动消费者时,会调用checkConfig方法检查对象属性配置,其中需要检查messageListener是否不为空,并且是否继承于MessageListenerOrderly或MessageListenerConcurrently。

消费消息的方式:在启动消费者后,消费者内部通过定时调度自动拉取消息并触发消费动作,消费动作会回调到messageListener对象。

DefaultMQPushConsumer消费原理

DefaultMQPushConsumer消费消息的动作是从启动开始的,因此从start方法开始研究。

image-20231115163809893

traceDispatcher是一个异步传输数据接口,默认情况下为null,属于扩展功能。目前主要关注defaultMQPushConsumerImpl.start()即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
 public synchronized void start() throws MQClientException {
// 根据服务状态决定启动动作
switch (this.serviceState) {
// 服务的初始状态
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
// 一开始先将服务状态设置为启动失败。。。(个人感觉有点画蛇添足,因为start方法本身已经是synchronized方法,同时switch case判断的其他状态目前都是直接抛异常,这里提前设置为启动失败可能只是担心在启动过程中发生异常而被忽略,防止服务再次被启动而出现未知的异常)
this.serviceState = ServiceState.START_FAILED;

// 检查对象配置
this.checkConfig();

// 拷贝订阅关系
this.copySubscription();

// 当消息模型为集群模式时,如果消费者的实例名称为默认值,则修改为<ip#系统纳秒时间>
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}

// 获取并创建MQ客户端实例
// MQ客户端实例中会包含非常多关键的元素,例如mQClientAPIImpl、mQAdminImpl、pullMessageService、rebalanceService、defaultMQProducer等
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

// 为负载均衡服务设置属性
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

// 设置PullAPIWrapper,该对象主要就是用于拉取消息,是与MQ API客户端的包装器
if (this.pullAPIWrapper == null) {
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

// 消息进度存储管理器
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

// 根据消息监听器的类型,初始化消费消息服务
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

this.consumeMessagePopService =
new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

// 启动消费消息服务,不同服务类型,启动后的逻辑不一样
this.consumeMessageService.start();
this.consumeMessagePopService.start();

// 将当前消费者注册到当前客户端实例的消费者分组缓存中,表示一个客户端下一个消费者分组只能有一个消费者
// 证明了官方的一句话,建议一个客户端只启动同一个消费者消费消息
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

// 客户端实例启动,启动过程会启动mQClientAPIImpl、pullMessageService、rebalanceService、defaultMQProducer,以及启动一系列的定时任务(更新nameserver地址、更新topic路由信息等等)
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

// 更新topic订阅信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}

在整个start启动过程中,涉及到Push消费消息的过程是mQClientFactory.start(),细看其方法实现。

image-20231115172204720

这其中this.pullMessageService.start()就是消费消息的关键,PullMessageService继承于ServiceThread

image-20231115173511872

ServiceThread#start方法实例化了一个Thread,并将当前对象作为Runnable参数,线程调用当前对象的run方法。

image-20231115173521034

PullMessageService#run是一个while循环方法,通过判断内部参数stopped终止循环。循环方法主要分两步:

  1. this.messageRequestQueue.take()是一个LinkedBlockingQueue对象,通过阻塞等待方式等待消息请求。
  2. 通过队列拿到消息请求对象后,调用拉取消息方法,默认调用this.pullMessage((PullRequest)messageRequest)

image-20231115173933395

通过消费者分组名称获取消费者对象,调用消费者对象的pullMessage方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
 public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}

if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
if (offset < 0) {
throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
}
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}

pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}

final long beginTimestamp = System.currentTimeMillis();

PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}

classFilter = sd.isClassFilterMode();
}

int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
this.defaultMQPushConsumer.getPullBatchSizeInBytes(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}

该方法主要是定义了一个内部匿名类PullCallback,再调用this.pullAPIWrapper.pullKernelImpl远程拉取消息,并回调到PullCallbackonSuccessonException方法,正常情况下进入onSuccess方法。

image-20231115175115721

PullResult为FOUND时,表示拉取到消息,调用DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest方法。

image-20231115175205124

this.consumeMessageService是根据消息监听器(MessageListener)的类型而来,当类型为MessageListenerOrderly时,进入ConsumeMessageOrderlyServicesubmitConsumeRequest方法。

image-20231115175226760

this.consumeExecutor是一个线程池对象,因此主要看ConsumeRequest对象的run方法。

image-20231115175328249

run方法主要实现消息监听回调的代码如下:

image-20231115175609836

其中messageListener.consumeMessage(Collections.unmodifiableList(msgs), context)就是回调消息监听器方法。

@RocketMQMessageListener实现原理

DefaultMQPushConsumer是RocketMQ核心库实现Push消费者的类,而RocketMQMessageListenerrocketmq-spring-boot包下的内容,也就是说它是基于SpringBoot实现的。

通过RocketMQAutoConfiguration找到RocketMQListenerConfiguration类。

image-20231116141137907

其注册了一个Bean,beanClass为RocketMQMessageListenerBeanPostProcessor,它实现了BeanPostProcessor接口。

image-20231116141313564

因此就看postProcessAfterInitialization方法,在Bean初始化后,RocketMQMessageListenerBeanPostProcessor判断每个Bean是否含有RocketMQMessageListener注解,对含有注解的类,做两步操作:

  1. enhance增强注解的属性。
  2. 将该Bean注册到ListenerContainerConfiguration容器中。

默认情况下,rocketmq-spring-boot没有实现AnnotationEnhancer接口用于enhance方法,所以主要看第二步listenerContainerConfiguration.registerContainer(beanName, bean, enhance)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
// 获取bean的目标class
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

// 判断beanClass是否即继承于RocketMQListener,又继承于RocketMQReplyListener
if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
}

// 判断beanClass是否继承于RocketMQListener或RocketMQReplyListener
if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
}

// 从注解获取consumerGroup、topic值
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());

// 判断当前监听器是否在指定consumerGroup和topic下开启监听,默认开启
boolean listenerEnabled =
(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);

// 未开启的话,日志打印提示
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
// 校验方法,如果注解标识的消息模型是广播模式并且消费模式是有序消费,则抛异常提示
validate(annotation);

// 生成 DefaultRocketMQListenerContainer 的beanName
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

// 向spring容器注册BeanDefinition,BeanClass为DefaultRocketMQListenerContainer,bean对象来源于 createRocketMQListenerContainer 方法
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
// 使用getBean方法,触发当前指定 containerBeanName Bean的生命周期
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
// 判断当前 DefaultRocketMQListenerContainer 的running状态是否启动,默认为false
if (!container.isRunning()) {
try {
// 启动 DefaultRocketMQListenerContainer
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}

// 打印日志通知,某个 RocketMQMessageListener Bean成功注册了一个 DefaultRocketMQListenerContainer Bean到spring容器中
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

该方法主要是做了两个操作:

  1. 向spring容器注册了一个 DefaultRocketMQListenerContainer Bean,并触发Bean生命周期。
  2. 调用了 DefaultRocketMQListenerContainer#start() 方法启动。

细看注册过程的createRocketMQListenerContainer方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

// 将注解属性填充到container
container.setRocketMQMessageListener(annotation);

// 赋值nameServer、accessChannel、topic、selectorExpression、consumerGroup、tlsEnable
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.hasLength(nameServer) ? nameServer : rocketMQProperties.getNameServer();
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (StringUtils.hasLength(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (StringUtils.hasLength(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));

// 根据beanClass类型,设置rocketMQListener或rocketMQReplyListener
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
}
// 设置spring-messaging的MessageConverter
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
// container的beanName
container.setName(name);

// 设置namespace
String namespace = environment.resolvePlaceholders(annotation.namespace());
container.setNamespace(RocketMQUtil.getNamespace(namespace,
rocketMQProperties.getConsumer().getNamespace()));
return container;
}

这一步主要是将RocketMQMessageListener注解上的属性赋值到DefaultRocketMQListenerContainer中,实例化一个DefaultRocketMQListenerContainer对象。

实例化完后,紧接着进行了一个genericApplicationContext.getBean操作,就是为了触发DefaultRocketMQListenerContainerBean的生命周期方法。

先看DefaultRocketMQListenerContainer的结构:

image-20231116152202404

在Bean的getBean生命周期中,会先后经过ApplicationContextAwareInitializingBean两个接口方法,即setApplicationContextafterPropertiesSet方法。

image-20231116152409223

通过代码可以看出,DefaultRocketMQListenerContainerafterPropertiesSet阶段做了三个操作:

  1. 初始化RocketMQPushConsumer对象。
  2. 获取RocketMQListener<T>RocketMQReplyListener<T, R>中范型T的对象类型。
  3. 获取RocketMQListener<T>RocketMQReplyListener<T, R>onMessage方法的MethodParameter对象。

看一下initRocketMQPushConsumer方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private void initRocketMQPushConsumer() throws MQClientException {
if (rocketMQListener == null && rocketMQReplyListener == null) {
throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
}
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");

RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
if (Objects.nonNull(rpcHook)) {
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);

String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
consumer.setNamesrvAddr(customizedNameServer);
} else {
consumer.setNamesrvAddr(nameServer);
}
if (accessChannel != null) {
consumer.setAccessChannel(accessChannel);
}
//set the consumer core thread number and maximum thread number has the same value
consumer.setConsumeThreadMax(consumeThreadNumber);
consumer.setConsumeThreadMin(consumeThreadNumber);
consumer.setConsumeTimeout(consumeTimeout);
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
consumer.setInstanceName(instanceName);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}

switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}

switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}

//if String is not is equal "true" TLS mode will represent the as default value false
consumer.setUseTLS(new Boolean(tlsEnable));

if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}

}

initRocketMQPushConsumer方法主要就是实例化了一个DefaultMQPushConsumer对象存储到DefaultRocketMQListenerContainer对象中。

实例化过程中,设置了一大堆属性,包括设置了消费者的消息模型(messageModel)、订阅关系(consumer.subscribe)、消息监听器(consumer.setMessageListener)。

在设置消息监听器中,使用的是内部实现的默认消息监听器。

image-20231116154921998

两者默认实现逻辑差不多,只是入参和出参的类型有些差异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}

return ConsumeOrderlyStatus.SUCCESS;
}
}

两者都是DefaultRocketMQListenerContainer类的内部类,在处理消息上,都是调用内部的handleMessage方法。

image-20231116155549954

在处理消息时,通过判断RocketMQMessageListener注解标记类实现的接口类是RocketMQListener还是RocketMQReplyListener,决定消息消费的逻辑,当实现接口为RocketMQReplyListener时,表示可回复的消息。

RocketMQReplyListener#onMessage方法返回的<R>会作为一条消息,通过消费者对应的生产者发送,消息的目标topic由原始消息决定,实现细节在MessageUtil.createReplyMessage(messageExt, convertToBytes(message))方法下。

image-20231116160858499

回过头,再看消息监听器的consumeMessage方法在何时被调用,通过IDE的调用链可知,是由ConsumeMessageServiceconsumeMessageDirectly方法所触发。

而在分析DefaultMQPushConsumer消费原理时,在defaultMQPushConsumerImpl.start()阶段,消费者会根据消息监听器(messageListener)的类型决定实例化具体的ConsumeMessageService

image-20231116161318342

MessageListener的具体实现就是DefaultRocketMQListenerContainer里面的DefaultMessageListenerConcurrentlyDefaultMessageListenerOrderly两个内部类。

至此,RocketMQTransactionListenerDefaultMQPushConsumer就关联起来了,两个关联的关键对象就是DefaultRocketMQListenerContainer

需要记住,真正做到消费消息的是RocketMQ核心包中的DefaultMQPushConsumer消费者,而RocketMQTransactionListener是对其做的扩展,通过SpringBoot能力帮助开发者省略了手动创建消费者的过程,让开发者只需要关注想要消费的topic、消费模式以及处理业务消息的业务逻辑等。