⚠️注意:rocketmq-spring-boot-starter
适用于 RocketMQ 4.x 客户端 ,主要面向依赖于 RocketMQ 4.x 的应用。
Pull与Push消费模式的对比
在 RocketMQ 的内部实现原理中,其实现机制为 PULL 模式,而 PUSH 模式是一种伪推送,是对 PULL 模式的封装,PullCustomer每拉去一批消息后,提交到消费端的线程池(异步),然后马上向 Broker 拉取消息,即实现类似“推”的效果。
从 PULL 模式来看,消息的消费主要包含如下几个方面:
消息拉取,消息拉取模式通过 PULL 相关的 API 从 Broker 指定消息消费队列中拉取一批消息到消费消费客户端,多个消费者需要手动完成队列的分配。
消息消费端处理完消费,需要向 Broker 端报告消息处理队列,然后继续拉取下一批消息。
如果遇到消息消费失败,需要告知 Broker,该条消息消费失败,后续需要重试,通过手动调用 sendMessageBack 方法实现。
而 PUSH 模式就上述这些处理操作无需使用者考虑,只需告诉 RocketMQ 消费者在拉取消息后需要调用的事件监听器即可,消息消费进度的存储、消息消费的重试统一由 RocketMQ Client 来实现。
Pull消费者API
Pull消费者的实现对象为:org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
启动消费者方法:DefaultLitePullConsumer#start() -> DefaultLitePullConsumerImpl#start()
消费消息的方式:手动调用 DefaultLitePullConsumer#poll()
Push消费者API
Push消费者的实现对象为:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
核心参数(在实例化对象后必须手动赋值的参数):MessageListener messageListener
启动消费者方法:DefaultMQPushConsumer#start() -> DefaultMQPushConsumerImpl#start()
在启动消费者时,会调用checkConfig
方法检查对象属性配置,其中需要检查messageListener是否不为空,并且是否继承于MessageListenerOrderly或MessageListenerConcurrently。
消费消息的方式:在启动消费者后,消费者内部通过定时调度自动拉取消息并触发消费动作,消费动作会回调到messageListener对象。
DefaultMQPushConsumer消费原理
DefaultMQPushConsumer
消费消息的动作是从启动开始的,因此从start
方法开始研究。
traceDispatcher
是一个异步传输数据接口,默认情况下为null,属于扩展功能。目前主要关注defaultMQPushConsumerImpl.start()
即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 public synchronized void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}" , this .defaultMQPushConsumer.getConsumerGroup(), this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode()); this .serviceState = ServiceState.START_FAILED; this .checkConfig(); this .copySubscription(); if (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactory); if (this .pullAPIWrapper == null ) { this .pullAPIWrapper = new PullAPIWrapper ( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); } this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore(this .offsetStore); } this .offsetStore.load(); if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService (this , (MessageListenerOrderly) this .getMessageListenerInner()); this .consumeMessagePopService = new ConsumeMessagePopOrderlyService (this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService (this , (MessageListenerConcurrently) this .getMessageListenerInner()); this .consumeMessagePopService = new ConsumeMessagePopConcurrentlyService (this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start(); this .consumeMessagePopService.start(); boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException ("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } mQClientFactory.start(); log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException ("The PushConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
在整个start启动过程中,涉及到Push消费消息的过程是mQClientFactory.start()
,细看其方法实现。
这其中this.pullMessageService.start()
就是消费消息的关键,PullMessageService
继承于ServiceThread
。
ServiceThread#start
方法实例化了一个Thread,并将当前对象作为Runnable参数,线程调用当前对象的run方法。
PullMessageService#run
是一个while循环方法,通过判断内部参数stopped
终止循环。循环方法主要分两步:
this.messageRequestQueue.take()
是一个LinkedBlockingQueue
对象,通过阻塞等待方式等待消息请求。
通过队列拿到消息请求对象后,调用拉取消息方法,默认调用this.pullMessage((PullRequest)messageRequest)
。
通过消费者分组名称获取消费者对象,调用消费者对象的pullMessage
方法。
public void pullMessage (final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped." , pullRequest.toString()); return ; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this .makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok" , e); this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return ; } if (this .isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}" , this .defaultMQPushConsumer.getInstanceName(), this .defaultMQPushConsumer.getConsumerGroup()); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return ; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024 ); if (cachedMessageCount > this .defaultMQPushConsumer.getPullThresholdForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (cachedMessageSizeInMiB > this .defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (!this .consumeOrderly) { if (processQueue.getMaxSpan() > this .defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}" , processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return ; } } else { if (processQueue.isLocked()) { if (!pullRequest.isPreviouslyLocked()) { long offset = -1L ; try { offset = this .rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); if (offset < 0 ) { throw new MQClientException (ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset); } } catch (Exception e) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}" , pullRequest, e); return ; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}" , pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}" , pullRequest, offset); } pullRequest.setPreviouslyLocked(true ); pullRequest.setNextOffset(offset); } } else { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}" , pullRequest); return ; } } final SubscriptionData subscriptionData = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}" , pullRequest); return ; } final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback () { @Override public void onSuccess (PullResult pullResult) { if (pullResult != null ) { pullResult = DefaultMQPushConsumerImpl.this .pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0 ).getQueueOffset(); DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this .consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval() > 0 ) { DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}" , pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break ; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this .correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); break ; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}" , pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true ); DefaultMQPushConsumerImpl.this .executeTaskLater(new Runnable () { @Override public void run () { try { DefaultMQPushConsumerImpl.this .offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false ); DefaultMQPushConsumerImpl.this .offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this .rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}" , pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception" , e); } } }, 10000 ); break ; default : break ; } } } @Override public void onException (Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception" , e); } DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; boolean commitOffsetEnable = false ; long commitOffsetValue = 0L ; if (MessageModel.CLUSTERING == this .defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this .offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0 ) { commitOffsetEnable = true ; } } String subExpression = null ; boolean classFilter = false ; SubscriptionData sd = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null ) { if (this .defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, true , subExpression != null , classFilter ); try { this .pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this .defaultMQPushConsumer.getPullBatchSize(), this .defaultMQPushConsumer.getPullBatchSizeInBytes(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception" , e); this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
该方法主要是定义了一个内部匿名类PullCallback
,再调用this.pullAPIWrapper.pullKernelImpl
远程拉取消息,并回调到PullCallback
的onSuccess
或onException
方法,正常情况下进入onSuccess
方法。
当PullResult
为FOUND时,表示拉取到消息,调用DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
方法。
this.consumeMessageService
是根据消息监听器(MessageListener)的类型而来,当类型为MessageListenerOrderly
时,进入ConsumeMessageOrderlyService
的submitConsumeRequest
方法。
this.consumeExecutor
是一个线程池对象,因此主要看ConsumeRequest
对象的run
方法。
run
方法主要实现消息监听回调的代码如下:
其中messageListener.consumeMessage(Collections.unmodifiableList(msgs), context)
就是回调消息监听器方法。
@RocketMQMessageListener实现原理
DefaultMQPushConsumer
是RocketMQ核心库实现Push消费者的类,而RocketMQMessageListener
是rocketmq-spring-boot
包下的内容,也就是说它是基于SpringBoot实现的。
通过RocketMQAutoConfiguration
找到RocketMQListenerConfiguration
类。
其注册了一个Bean,beanClass为RocketMQMessageListenerBeanPostProcessor
,它实现了BeanPostProcessor
接口。
因此就看postProcessAfterInitialization
方法,在Bean初始化后,RocketMQMessageListenerBeanPostProcessor
判断每个Bean是否含有RocketMQMessageListener
注解,对含有注解的类,做两步操作:
enhance
增强注解的属性。
将该Bean注册到ListenerContainerConfiguration
容器中。
默认情况下,rocketmq-spring-boot没有实现AnnotationEnhancer
接口用于enhance
方法,所以主要看第二步listenerContainerConfiguration.registerContainer(beanName, bean, enhance)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void registerContainer (String beanName, Object bean, RocketMQMessageListener annotation) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException (clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException (clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } String consumerGroup = this .environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this .environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean ) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true ); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization." , consumerGroup, topic); return ; } validate(annotation); String containerBeanName = String.format("%s_%s" , DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}" , container, e); throw new RuntimeException (e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}" , beanName, containerBeanName); }
该方法主要是做了两个操作:
向spring容器注册了一个 DefaultRocketMQListenerContainer Bean,并触发Bean生命周期。
调用了 DefaultRocketMQListenerContainer#start() 方法启动。
细看注册过程的createRocketMQListenerContainer
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private DefaultRocketMQListenerContainer createRocketMQListenerContainer (String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer (); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.hasLength(nameServer) ? nameServer : rocketMQProperties.getNameServer(); String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (StringUtils.hasLength(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (StringUtils.hasLength(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable())); if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQListener((RocketMQListener) bean); } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQReplyListener((RocketMQReplyListener) bean); } container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); String namespace = environment.resolvePlaceholders(annotation.namespace()); container.setNamespace(RocketMQUtil.getNamespace(namespace, rocketMQProperties.getConsumer().getNamespace())); return container; }
这一步主要是将RocketMQMessageListener
注解上的属性赋值到DefaultRocketMQListenerContainer
中,实例化一个DefaultRocketMQListenerContainer
对象。
实例化完后,紧接着进行了一个genericApplicationContext.getBean
操作,就是为了触发DefaultRocketMQListenerContainer
Bean的生命周期方法。
先看DefaultRocketMQListenerContainer
的结构:
在Bean的getBean生命周期中,会先后经过ApplicationContextAware
和InitializingBean
两个接口方法,即setApplicationContext
和afterPropertiesSet
方法。
通过代码可以看出,DefaultRocketMQListenerContainer
在afterPropertiesSet
阶段做了三个操作:
初始化RocketMQPushConsumer
对象。
获取RocketMQListener<T>
或RocketMQReplyListener<T, R>
中范型T的对象类型。
获取RocketMQListener<T>
或RocketMQReplyListener<T, R>
的onMessage
方法的MethodParameter
对象。
看一下initRocketMQPushConsumer
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 private void initRocketMQPushConsumer () throws MQClientException { if (rocketMQListener == null && rocketMQReplyListener == null ) { throw new IllegalArgumentException ("Property 'rocketMQListener' or 'rocketMQReplyListener' is required" ); } Assert.notNull(consumerGroup, "Property 'consumerGroup' is required" ); Assert.notNull(nameServer, "Property 'nameServer' is required" ); Assert.notNull(topic, "Property 'topic' is required" ); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this .rocketMQMessageListener.accessKey(), this .rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer (consumerGroup, rpcHook, new AllocateMessageQueueAveragely (), enableMsgTrace, this .applicationContext.getEnvironment(). resolveRequiredPlaceholders(this .rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false ); } else { log.debug("Access-key or secret-key not configure in " + this + "." ); consumer = new DefaultMQPushConsumer (consumerGroup, enableMsgTrace, this .applicationContext.getEnvironment(). resolveRequiredPlaceholders(this .rocketMQMessageListener.customizedTraceTopic())); } consumer.setNamespace(namespace); String customizedNameServer = this .applicationContext.getEnvironment().resolveRequiredPlaceholders(this .rocketMQMessageListener.nameServer()); if (customizedNameServer != null ) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null ) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadNumber); consumer.setConsumeThreadMin(consumeThreadNumber); consumer.setConsumeTimeout(consumeTimeout); consumer.setMaxReconsumeTimes(maxReconsumeTimes); consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown); consumer.setInstanceName(instanceName); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break ; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break ; default : throw new IllegalArgumentException ("Property 'messageModel' was wrong." ); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break ; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break ; default : throw new IllegalArgumentException ("Property 'selectorType' was wrong." ); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly ()); break ; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently ()); break ; default : throw new IllegalArgumentException ("Property 'consumeMode' was wrong." ); } consumer.setUseTLS(new Boolean (tlsEnable)); if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); } }
initRocketMQPushConsumer
方法主要就是实例化了一个DefaultMQPushConsumer
对象存储到DefaultRocketMQListenerContainer
对象中。
实例化过程中,设置了一大堆属性,包括设置了消费者的消息模型(messageModel)、订阅关系(consumer.subscribe
)、消息监听器(consumer.setMessageListener
)。
在设置消息监听器中,使用的是内部实现的默认消息监听器。
两者默认实现逻辑差不多,只是入参和出参的类型有些差异。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}" , messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms" , messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}" , messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @Override public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}" , messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms" , messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}" , messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }
两者都是DefaultRocketMQListenerContainer
类的内部类,在处理消息上,都是调用内部的handleMessage
方法。
在处理消息时,通过判断RocketMQMessageListener
注解标记类实现的接口类是RocketMQListener
还是RocketMQReplyListener
,决定消息消费的逻辑,当实现接口为RocketMQReplyListener
时,表示可回复的消息。
RocketMQReplyListener#onMessage
方法返回的<R>会作为一条消息,通过消费者对应的生产者发送,消息的目标topic由原始消息决定,实现细节在MessageUtil.createReplyMessage(messageExt, convertToBytes(message))
方法下。
回过头,再看消息监听器的consumeMessage
方法在何时被调用,通过IDE的调用链可知,是由ConsumeMessageService
的consumeMessageDirectly
方法所触发。
而在分析DefaultMQPushConsumer
消费原理时,在defaultMQPushConsumerImpl.start()
阶段,消费者会根据消息监听器(messageListener)的类型决定实例化具体的ConsumeMessageService
。
而MessageListener
的具体实现就是DefaultRocketMQListenerContainer
里面的DefaultMessageListenerConcurrently
和DefaultMessageListenerOrderly
两个内部类。
至此,RocketMQTransactionListener
与DefaultMQPushConsumer
就关联起来了,两个关联的关键对象就是DefaultRocketMQListenerContainer
。
需要记住,真正做到消费消息的是RocketMQ核心包中的DefaultMQPushConsumer
消费者,而RocketMQTransactionListener
是对其做的扩展,通过SpringBoot能力帮助开发者省略了手动创建消费者的过程,让开发者只需要关注想要消费的topic、消费模式以及处理业务消息的业务逻辑等。