in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/BlockingQueueStubManager.java [231:259]
private synchronized Optional<CancelMessage> tryResolve() {
// detects blocking messages
BlockingQueue.BlockingMessage blockingMessage = null;
for (BlockingQueue blockingQueue : blockingQueues) {
Optional<BlockingQueue.BlockingMessage> optionalBlockingMessage =
blockingQueue.detectBlockingMessage(topicPartition);
if (optionalBlockingMessage.isPresent()) {
blockingMessage = optionalBlockingMessage.get();
break;
}
}
// mark blocking messages as canceled
if (blockingMessage != null) {
TopicPartitionOffset cancelMetadata = blockingMessage.getMetaData();
BlockingQueue.BlockingReason reason = blockingMessage.getReason();
CancelResult result = runCancelRules(reason);
logAndReportCancelResult(job, reason, result);
if (result.isSucceed()) {
for (BlockingQueue blockingQueue : blockingQueues) {
blockingQueue.markCanceled(cancelMetadata);
}
return Optional.of(new CancelMessage(result, cancelMetadata));
}
}
return Optional.empty();
}