⚠️注意:rocketmq-spring-boot-starter
适用于 RocketMQ 4.x 客户端 ,主要面向依赖于 RocketMQ 4.x 的应用。
Pull与Push消费模式的对比
在 RocketMQ 的内部实现原理中,其实现机制为 PULL 模式,而 PUSH 模式是一种伪推送,是对 PULL 模式的封装,PullCustomer每拉去一批消息后,提交到消费端的线程池(异步),然后马上向 Broker 拉取消息,即实现类似“推”的效果。
从 PULL 模式来看,消息的消费主要包含如下几个方面:
消息拉取,消息拉取模式通过 PULL 相关的 API 从 Broker 指定消息消费队列中拉取一批消息到消费消费客户端,多个消费者需要手动完成队列的分配。
消息消费端处理完消费,需要向 Broker 端报告消息处理队列,然后继续拉取下一批消息。
如果遇到消息消费失败,需要告知 Broker,该条消息消费失败,后续需要重试,通过手动调用 sendMessageBack 方法实现。
而 PUSH 模式就上述这些处理操作无需使用者考虑,只需告诉 RocketMQ 消费者在拉取消息后需要调用的事件监听器即可,消息消费进度的存储、消息消费的重试统一由 RocketMQ Client 来实现。
Pull消费者API
Pull消费者的实现对象为:org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
启动消费者方法:DefaultLitePullConsumer#start() -> DefaultLitePullConsumerImpl#start()
消费消息的方式:手动调用 DefaultLitePullConsumer#poll()
Push消费者API
Push消费者的实现对象为:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
核心参数(在实例化对象后必须手动赋值的参数):MessageListener messageListener
启动消费者方法:DefaultMQPushConsumer#start() -> DefaultMQPushConsumerImpl#start()
在启动消费者时,会调用checkConfig
方法检查对象属性配置,其中需要检查messageListener是否不为空,并且是否继承于MessageListenerOrderly或MessageListenerConcurrently。
消费消息的方式:在启动消费者后,消费者内部通过定时调度自动拉取消息并触发消费动作,消费动作会回调到messageListener对象。
DefaultMQPushConsumer消费原理
DefaultMQPushConsumer
消费消息的动作是从启动开始的,因此从start
方法开始研究。
traceDispatcher
是一个异步传输数据接口,默认情况下为null,属于扩展功能。目前主要关注defaultMQPushConsumerImpl.start()
即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 public synchronized void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}" , this .defaultMQPushConsumer.getConsumerGroup(), this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode()); this .serviceState = ServiceState.START_FAILED; this .checkConfig(); this .copySubscription(); if (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactory); if (this .pullAPIWrapper == null ) { this .pullAPIWrapper = new PullAPIWrapper ( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); } this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore(this .offsetStore); } this .offsetStore.load(); if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService (this , (MessageListenerOrderly) this .getMessageListenerInner()); this .consumeMessagePopService = new ConsumeMessagePopOrderlyService (this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService (this , (MessageListenerConcurrently) this .getMessageListenerInner()); this .consumeMessagePopService = new ConsumeMessagePopConcurrentlyService (this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start(); this .consumeMessagePopService.start(); boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException ("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } mQClientFactory.start(); log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException ("The PushConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
在整个start启动过程中,涉及到Push消费消息的过程是mQClientFactory.start()
,细看其方法实现。
这其中this.pullMessageService.start()
就是消费消息的关键,PullMessageService
继承于ServiceThread
。
ServiceThread#start
方法实例化了一个Thread,并将当前对象作为Runnable参数,线程调用当前对象的run方法。
PullMessageService#run
是一个while循环方法,通过判断内部参数stopped
终止循环。循环方法主要分两步:
this.messageRequestQueue.take()
是一个LinkedBlockingQueue
对象,通过阻塞等待方式等待消息请求。
通过队列拿到消息请求对象后,调用拉取消息方法,默认调用this.pullMessage((PullRequest)messageRequest)
。
通过消费者分组名称获取消费者对象,调用消费者对象的pullMessage
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 public void pullMessage (final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped." , pullRequest.toString()); return ; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this .makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok" , e); this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return ; } if (this .isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}" , this .defaultMQPushConsumer.getInstanceName(), this .defaultMQPushConsumer.getConsumerGroup()); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return ; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024 ); if (cachedMessageCount > this .defaultMQPushConsumer.getPullThresholdForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (cachedMessageSizeInMiB > this .defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (!this .consumeOrderly) { if (processQueue.getMaxSpan() > this .defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}" , processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return ; } } else { if (processQueue.isLocked()) { if (!pullRequest.isPreviouslyLocked()) { long offset = -1L ; try { offset = this .rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); if (offset < 0 ) { throw new MQClientException (ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset); } } catch (Exception e) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}" , pullRequest, e); return ; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}" , pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}" , pullRequest, offset); } pullRequest.setPreviouslyLocked(true ); pullRequest.setNextOffset(offset); } } else { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}" , pullRequest); return ; } } final SubscriptionData subscriptionData = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}" , pullRequest); return ; } final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback () { @Override public void onSuccess (PullResult pullResult) { if (pullResult != null ) { pullResult = DefaultMQPushConsumerImpl.this .pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0 ).getQueueOffset(); DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this .consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval() > 0 ) { DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}" , pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break ; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this .correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); break ; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}" , pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true ); DefaultMQPushConsumerImpl.this .executeTaskLater(new Runnable () { @Override public void run () { try { DefaultMQPushConsumerImpl.this .offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false ); DefaultMQPushConsumerImpl.this .offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this .rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}" , pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception" , e); } } }, 10000 ); break ; default : break ; } } } @Override public void onException (Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception" , e); } DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; boolean commitOffsetEnable = false ; long commitOffsetValue = 0L ; if (MessageModel.CLUSTERING == this .defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this .offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0 ) { commitOffsetEnable = true ; } } String subExpression = null ; boolean classFilter = false ; SubscriptionData sd = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null ) { if (this .defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, true , subExpression != null , classFilter ); try { this .pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this .defaultMQPushConsumer.getPullBatchSize(), this .defaultMQPushConsumer.getPullBatchSizeInBytes(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception" , e); this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
该方法主要是定义了一个内部匿名类PullCallback
,再调用this.pullAPIWrapper.pullKernelImpl
远程拉取消息,并回调到PullCallback
的onSuccess
或onException
方法,正常情况下进入onSuccess
方法。
当PullResult
为FOUND时,表示拉取到消息,调用DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
方法。
this.consumeMessageService
是根据消息监听器(MessageListener)的类型而来,当类型为MessageListenerOrderly
时,进入ConsumeMessageOrderlyService
的submitConsumeRequest
方法。
this.consumeExecutor
是一个线程池对象,因此主要看ConsumeRequest
对象的run
方法。
run
方法主要实现消息监听回调的代码如下:
其中messageListener.consumeMessage(Collections.unmodifiableList(msgs), context)
就是回调消息监听器方法。
@RocketMQMessageListener实现原理
DefaultMQPushConsumer
是RocketMQ核心库实现Push消费者的类,而RocketMQMessageListener
是rocketmq-spring-boot
包下的内容,也就是说它是基于SpringBoot实现的。
通过RocketMQAutoConfiguration
找到RocketMQListenerConfiguration
类。
其注册了一个Bean,beanClass为RocketMQMessageListenerBeanPostProcessor
,它实现了BeanPostProcessor
接口。
因此就看postProcessAfterInitialization
方法,在Bean初始化后,RocketMQMessageListenerBeanPostProcessor
判断每个Bean是否含有RocketMQMessageListener
注解,对含有注解的类,做两步操作:
enhance
增强注解的属性。
将该Bean注册到ListenerContainerConfiguration
容器中。
默认情况下,rocketmq-spring-boot没有实现AnnotationEnhancer
接口用于enhance
方法,所以主要看第二步listenerContainerConfiguration.registerContainer(beanName, bean, enhance)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void registerContainer (String beanName, Object bean, RocketMQMessageListener annotation) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException (clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException (clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } String consumerGroup = this .environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this .environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean ) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true ); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization." , consumerGroup, topic); return ; } validate(annotation); String containerBeanName = String.format("%s_%s" , DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}" , container, e); throw new RuntimeException (e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}" , beanName, containerBeanName); }
该方法主要是做了两个操作:
向spring容器注册了一个 DefaultRocketMQListenerContainer Bean,并触发Bean生命周期。
调用了 DefaultRocketMQListenerContainer#start() 方法启动。
细看注册过程的createRocketMQListenerContainer
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private DefaultRocketMQListenerContainer createRocketMQListenerContainer (String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer (); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.hasLength(nameServer) ? nameServer : rocketMQProperties.getNameServer(); String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (StringUtils.hasLength(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (StringUtils.hasLength(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable())); if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQListener((RocketMQListener) bean); } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQReplyListener((RocketMQReplyListener) bean); } container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); String namespace = environment.resolvePlaceholders(annotation.namespace()); container.setNamespace(RocketMQUtil.getNamespace(namespace, rocketMQProperties.getConsumer().getNamespace())); return container; }
这一步主要是将RocketMQMessageListener
注解上的属性赋值到DefaultRocketMQListenerContainer
中,实例化一个DefaultRocketMQListenerContainer
对象。
实例化完后,紧接着进行了一个genericApplicationContext.getBean
操作,就是为了触发DefaultRocketMQListenerContainer
Bean的生命周期方法。
先看DefaultRocketMQListenerContainer
的结构:
在Bean的getBean生命周期中,会先后经过ApplicationContextAware
和InitializingBean
两个接口方法,即setApplicationContext
和afterPropertiesSet
方法。
通过代码可以看出,DefaultRocketMQListenerContainer
在afterPropertiesSet
阶段做了三个操作:
初始化RocketMQPushConsumer
对象。
获取RocketMQListener<T>
或RocketMQReplyListener<T, R>
中范型T的对象类型。
获取RocketMQListener<T>
或RocketMQReplyListener<T, R>
的onMessage
方法的MethodParameter
对象。
看一下initRocketMQPushConsumer
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 private void initRocketMQPushConsumer () throws MQClientException { if (rocketMQListener == null && rocketMQReplyListener == null ) { throw new IllegalArgumentException ("Property 'rocketMQListener' or 'rocketMQReplyListener' is required" ); } Assert.notNull(consumerGroup, "Property 'consumerGroup' is required" ); Assert.notNull(nameServer, "Property 'nameServer' is required" ); Assert.notNull(topic, "Property 'topic' is required" ); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this .rocketMQMessageListener.accessKey(), this .rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer (consumerGroup, rpcHook, new AllocateMessageQueueAveragely (), enableMsgTrace, this .applicationContext.getEnvironment(). resolveRequiredPlaceholders(this .rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false ); } else { log.debug("Access-key or secret-key not configure in " + this + "." ); consumer = new DefaultMQPushConsumer (consumerGroup, enableMsgTrace, this .applicationContext.getEnvironment(). resolveRequiredPlaceholders(this .rocketMQMessageListener.customizedTraceTopic())); } consumer.setNamespace(namespace); String customizedNameServer = this .applicationContext.getEnvironment().resolveRequiredPlaceholders(this .rocketMQMessageListener.nameServer()); if (customizedNameServer != null ) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null ) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadNumber); consumer.setConsumeThreadMin(consumeThreadNumber); consumer.setConsumeTimeout(consumeTimeout); consumer.setMaxReconsumeTimes(maxReconsumeTimes); consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown); consumer.setInstanceName(instanceName); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break ; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break ; default : throw new IllegalArgumentException ("Property 'messageModel' was wrong." ); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break ; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break ; default : throw new IllegalArgumentException ("Property 'selectorType' was wrong." ); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly ()); break ; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently ()); break ; default : throw new IllegalArgumentException ("Property 'consumeMode' was wrong." ); } consumer.setUseTLS(new Boolean (tlsEnable)); if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); } }
initRocketMQPushConsumer
方法主要就是实例化了一个DefaultMQPushConsumer
对象存储到DefaultRocketMQListenerContainer
对象中。
实例化过程中,设置了一大堆属性,包括设置了消费者的消息模型(messageModel)、订阅关系(consumer.subscribe
)、消息监听器(consumer.setMessageListener
)。
在设置消息监听器中,使用的是内部实现的默认消息监听器。
两者默认实现逻辑差不多,只是入参和出参的类型有些差异。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}" , messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms" , messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}" , messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @Override public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}" , messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms" , messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}" , messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }
两者都是DefaultRocketMQListenerContainer
类的内部类,在处理消息上,都是调用内部的handleMessage
方法。
在处理消息时,通过判断RocketMQMessageListener
注解标记类实现的接口类是RocketMQListener
还是RocketMQReplyListener
,决定消息消费的逻辑,当实现接口为RocketMQReplyListener
时,表示可回复的消息。
RocketMQReplyListener#onMessage
方法返回的<R>会作为一条消息,通过消费者对应的生产者发送,消息的目标topic由原始消息决定,实现细节在MessageUtil.createReplyMessage(messageExt, convertToBytes(message))
方法下。
回过头,再看消息监听器的consumeMessage
方法在何时被调用,通过IDE的调用链可知,是由ConsumeMessageService
的consumeMessageDirectly
方法所触发。
而在分析DefaultMQPushConsumer
消费原理时,在defaultMQPushConsumerImpl.start()
阶段,消费者会根据消息监听器(messageListener)的类型决定实例化具体的ConsumeMessageService
。
而MessageListener
的具体实现就是DefaultRocketMQListenerContainer
里面的DefaultMessageListenerConcurrently
和DefaultMessageListenerOrderly
两个内部类。
至此,RocketMQTransactionListener
与DefaultMQPushConsumer
就关联起来了,两个关联的关键对象就是DefaultRocketMQListenerContainer
。
需要记住,真正做到消费消息的是RocketMQ核心包中的DefaultMQPushConsumer
消费者,而RocketMQTransactionListener
是对其做的扩展,通过SpringBoot能力帮助开发者省略了手动创建消费者的过程,让开发者只需要关注想要消费的topic、消费模式以及处理业务消息的业务逻辑等。