in java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java [394:448]
private void changeInvisibleDuration(final MessageViewImpl messageView, final Duration duration,
final int attempt, final SettableFuture<Void> future0) {
final ClientId clientId = consumer.getClientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
final RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse> future =
consumer.changeInvisibleDuration(messageView, duration);
Futures.addCallback(future, new FutureCallback<ChangeInvisibleDurationResponse>() {
@Override
public void onSuccess(ChangeInvisibleDurationResponse response) {
final String requestId = future.getContext().getRequestId();
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
log.error("Failed to change invisible duration due to the invalid receipt handle, forgive to "
+ "retry, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, "
+ "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
endpoints, requestId, status.getMessage());
future0.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
return;
}
// Log failure and retry later.
if (!Code.OK.equals(code)) {
log.error("Failed to change invisible duration, would retry later, clientId={}, "
+ "consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, "
+ "status message=[{}]", clientId, consumerGroup, messageId, attempt, mq, endpoints,
requestId, status.getMessage());
changeInvisibleDurationLater(messageView, duration, 1 + attempt, future0);
return;
}
// Set result if succeed in changing invisible time.
future0.setFuture(Futures.immediateVoidFuture());
// Log retries.
if (1 < attempt) {
log.info("Finally, change invisible duration successfully, clientId={}, consumerGroup={} "
+ "messageId={}, attempt={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
messageId, attempt, mq, endpoints, requestId);
return;
}
log.debug("Change invisible duration successfully, clientId={}, consumerGroup={}, messageId={}, "
+ "mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints,
requestId);
}
@Override
public void onFailure(Throwable t) {
// Log failure and retry later.
log.error("Exception raised while changing invisible duration, would retry later, clientId={}, "
+ "consumerGroup={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup,
messageId, mq, endpoints, t);
changeInvisibleDurationLater(messageView, duration, 1 + attempt, future0);
}
}, MoreExecutors.directExecutor());
}