private void receiveMessageImmediately()

in java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java [228:295]


    private void receiveMessageImmediately(String attemptId) {
        final ClientId clientId = consumer.getClientId();
        if (!consumer.isRunning()) {
            log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
            return;
        }
        try {
            final Endpoints endpoints = mq.getBroker().getEndpoints();
            final int batchSize = this.getReceptionBatchSize();
            final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
            final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
                longPollingTimeout, attemptId);
            activityNanoTime = System.nanoTime();

            // Intercept before message reception.
            final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
            consumer.doBefore(context, Collections.emptyList());

            final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
                longPollingTimeout);
            Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
                @Override
                public void onSuccess(ReceiveMessageResult result) {
                    // Intercept after message reception.
                    final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
                        .map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
                        .collect(Collectors.toList());
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
                    consumer.doAfter(context0, generalMessages);

                    try {
                        onReceiveMessageResult(result);
                    } catch (Throwable t) {
                        // Should never reach here.
                        log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
                            + "clientId={}", mq, endpoints, clientId, t);
                        onReceiveMessageException(t, attemptId);
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    String nextAttemptId = null;
                    if (t instanceof StatusRuntimeException) {
                        StatusRuntimeException exception = (StatusRuntimeException) t;
                        if (io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
                            nextAttemptId = request.getAttemptId();
                        }
                    }
                    // Intercept after message reception.
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
                    consumer.doAfter(context0, Collections.emptyList());

                    log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
                            "nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
                        clientId, t);
                    onReceiveMessageException(t, nextAttemptId);
                }
            }, MoreExecutors.directExecutor());
            receptionTimes.getAndIncrement();
            consumer.getReceptionTimes().getAndIncrement();
        } catch (Throwable t) {
            log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
            onReceiveMessageException(t, attemptId);
        }
    }