in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ArrayAckTrackingQueue.java [70:119]
public synchronized void receive(long offset, Map<AttributeKey, Attribute> attributes)
throws InterruptedException {
lock.lock();
try {
if (highestReceivedOffset != INITIAL_OFFSET) {
if (offset - highestReceivedOffset != 1) {
scope
.gauge(AckTrackingQueue.MetricNames.OFFSET_RECEIVED_GAP)
.update(offset - highestReceivedOffset);
LOGGER.warn(
"gaps between received offsets",
StructuredLogging.offsetGap(offset - highestReceivedOffset));
}
}
if (highestReceivedOffset < offset) {
scope.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_RECEIVED).update(offset);
// the first time to receive
if (highestReceivedOffset == INITIAL_OFFSET) {
this.offsetMappingHeadIndex = offset;
this.lowestCancelableOffset = offset;
this.highestReceivedOffset = offset;
this.highestAckedOffset = offset;
} else if (offset - highestReceivedOffset != 1) {
// offsets should be received in order.
// If a new offset is too large, it means that some offsets are purged from Kafka servers,
// so we need to reset the ack tracking queue.
this.offsetMappingHeadIndex = offset;
this.lowestCancelableOffset = offset;
this.highestReceivedOffset = offset;
this.highestAckedOffset = offset;
reset();
notFull.signalAll();
}
// if we cannot mark more offsets as received
// action: wait until this offset can be received (then proceed) or this queue is not in
// use
// anymore (then return)
if (waitForNotFull(offset + 1)) {
OffsetStatus status =
items[(int) (offset - offsetMappingHeadIndex + headIndex) % capacity];
status.attributes = attributes;
onReceive(attributes);
highestReceivedOffset = offset;
}
updateState();
}
} finally {
lock.unlock();
}
}