in java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java [228:295]
private void receiveMessageImmediately(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (!consumer.isRunning()) {
log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
return;
}
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final int batchSize = this.getReceptionBatchSize();
final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
longPollingTimeout, attemptId);
activityNanoTime = System.nanoTime();
// Intercept before message reception.
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
consumer.doBefore(context, Collections.emptyList());
final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
longPollingTimeout);
Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageResult result) {
// Intercept after message reception.
final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
.map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
.collect(Collectors.toList());
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
consumer.doAfter(context0, generalMessages);
try {
onReceiveMessageResult(result);
} catch (Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
+ "clientId={}", mq, endpoints, clientId, t);
onReceiveMessageException(t, attemptId);
}
}
@Override
public void onFailure(Throwable t) {
String nextAttemptId = null;
if (t instanceof StatusRuntimeException) {
StatusRuntimeException exception = (StatusRuntimeException) t;
if (io.grpc.Status.DEADLINE_EXCEEDED.equals(exception.getStatus())) {
nextAttemptId = request.getAttemptId();
}
}
// Intercept after message reception.
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());
log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
clientId, t);
onReceiveMessageException(t, nextAttemptId);
}
}, MoreExecutors.directExecutor());
receptionTimes.getAndIncrement();
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
onReceiveMessageException(t, attemptId);
}
}