in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/MessageStub.java [148:179]
public synchronized boolean cancel(DispatcherResponse.Code code) {
if (closed) {
// ack received
return false;
}
if (cancelCode.compareAndSet(null, code)) {
final CompletableFuture finalFuturePermit = futurePermit;
final Cancelable finalCurrentAttempt = currentAttempt;
if (finalFuturePermit != null) {
// complete the future permit
finalFuturePermit.cancel(false);
logDebugStatus(DebugStatus.PERMIT_CANCEL);
} else if (finalCurrentAttempt != null) {
// cancel happens during an attempt, cancel the attempt
// canceled attempt should skip retry with grpc Cancelled state
finalCurrentAttempt.cancel();
logDebugStatus(DebugStatus.ATTEMPT_CANCEL);
} else {
if (retryFuture != null && attemptCount.get() > 0) {
// cancel the retry future, attempt if scheduled in failsafe due to backoff
// should be cancel as well, and fallback to fallback policy with CancellationException
logDebugStatus(DebugStatus.RETRY_CANCEL);
retryFuture.cancel(false);
} else {
logDebugStatus(DebugStatus.PASSIVE_CANCEL);
}
}
return true;
}
return false;
}