in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/AbstractAckTrackingQueue.java [129:194]
public abstract void receive(long offset, Map<AttributeKey, Attribute> attributes)
throws InterruptedException;
/**
* tests if AckTrackingQueue is full
*
* @param offset the offset to mark
* @return the boolean
*/
protected abstract boolean isFull(long offset);
/**
* Try to wait for the queue to be not full.
*
* @param offsetToMark the offset to be marked
* @return false when the wait fails, and the caller should return; true when the wait succeeds,
* and the caller should proceed to do other work
* @throws InterruptedException when the current thread is waiting for the signal, it may be
* interrupted.
*/
protected boolean waitForNotFull(long offsetToMark) throws InterruptedException {
// Check whether the ack tracking is marked as not in use or not.
// If it's not in use any more, there is no need to wait for the ack tracking is not full,
// simply tell the caller that it cannot acquire the lock.
if (notInUse) {
return false;
}
Stopwatch stopwatch = null;
lock.lock();
try {
while (isFull(offsetToMark)) {
if (notInUse) {
if (stopwatch != null) {
stopwatch.stop();
}
logger.info(
"an ack tracking queue is not in use now",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(job.getKafkaConsumerTask().getTopic()),
StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()));
return false;
} else {
if (stopwatch == null) {
stopwatch =
scope.timer(AckTrackingQueue.MetricNames.STATUS_CHANGE_WAITING_TIME).start();
}
notFull.await();
}
}
if (stopwatch != null) {
stopwatch.stop();
}
logger.debug(
"an ack tracking queue is not full now",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(job.getKafkaConsumerTask().getTopic()),
StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()));
return true;
} finally {
lock.unlock();
}
}