in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java [76:139]
protected void onInit() {
if (extendedConsumerProperties.getExtension() == null
|| !extendedConsumerProperties.getExtension().getEnabled()) {
return;
}
try {
super.onInit();
if (this.retryTemplate != null) {
Assert.state(getErrorChannel() == null,
"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted");
this.retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
}
});
}
pushConsumer = RocketMQConsumerFactory
.initPushConsumer(extendedConsumerProperties);
// prepare register consumer message listener,the next step is to be
// compatible with a custom MessageListener.
if (extendedConsumerProperties.getExtension().getPush().getOrderly()) {
pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs,
context) -> RocketMQInboundChannelAdapter.this
.consumeMessage(msgs, () -> {
context.setSuspendCurrentQueueTimeMillis(
extendedConsumerProperties.getExtension()
.getPush()
.getSuspendCurrentQueueTimeMillis());
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}, () -> ConsumeOrderlyStatus.SUCCESS));
}
else {
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs,
context) -> RocketMQInboundChannelAdapter.this
.consumeMessage(msgs, () -> {
context.setDelayLevelWhenNextConsume(
extendedConsumerProperties.getExtension()
.getPush()
.getDelayLevelWhenNextConsume());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}, () -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS));
}
}
catch (Exception e) {
log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"DefaultMQPushConsumer init failed, Caused by " + e.getMessage())
.build(), e);
}
}