protected void onInit()

in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java [76:139]


	protected void onInit() {
		if (extendedConsumerProperties.getExtension() == null
				|| !extendedConsumerProperties.getExtension().getEnabled()) {
			return;
		}
		try {
			super.onInit();
			if (this.retryTemplate != null) {
				Assert.state(getErrorChannel() == null,
						"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
								+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
								+ "send an error message when retries are exhausted");
				this.retryTemplate.registerListener(new RetryListener() {
					@Override
					public <T, E extends Throwable> boolean open(RetryContext context,
							RetryCallback<T, E> callback) {
						return true;
					}

					@Override
					public <T, E extends Throwable> void close(RetryContext context,
							RetryCallback<T, E> callback, Throwable throwable) {
					}

					@Override
					public <T, E extends Throwable> void onError(RetryContext context,
							RetryCallback<T, E> callback, Throwable throwable) {
					}
				});
			}
			pushConsumer = RocketMQConsumerFactory
					.initPushConsumer(extendedConsumerProperties);
			// prepare register consumer message listener,the next step is to be
			// compatible with a custom MessageListener.
			if (extendedConsumerProperties.getExtension().getPush().getOrderly()) {
				pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs,
						context) -> RocketMQInboundChannelAdapter.this
								.consumeMessage(msgs, () -> {
									context.setSuspendCurrentQueueTimeMillis(
											extendedConsumerProperties.getExtension()
													.getPush()
													.getSuspendCurrentQueueTimeMillis());
									return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
								}, () -> ConsumeOrderlyStatus.SUCCESS));
			}
			else {
				pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs,
						context) -> RocketMQInboundChannelAdapter.this
								.consumeMessage(msgs, () -> {
									context.setDelayLevelWhenNextConsume(
											extendedConsumerProperties.getExtension()
													.getPush()
													.getDelayLevelWhenNextConsume());
									return ConsumeConcurrentlyStatus.RECONSUME_LATER;
								}, () -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS));
			}
		}
		catch (Exception e) {
			log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
			throw new MessagingException(MessageBuilder.withPayload(
					"DefaultMQPushConsumer init failed, Caused by " + e.getMessage())
					.build(), e);
		}
	}