in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ArrayAckTrackingQueue.java [122:174]
public long ack(long offset) throws InterruptedException {
lock.lock();
try {
if (!validateOffset(offset)) {
return CANNOT_ACK;
}
scope.gauge(MetricNames.IN_MEMORY_CAPACITY).update(capacity);
// condition 5: this offset is the head of the ack queue
// action:
// (1) move head to the next un-acked offset
// (2) adjust offsetMappingHeadIndex
long ret;
if (offset == offsetMappingHeadIndex + 1) {
setAckStatus(headIndex, AckStatus.ACKED);
while (items[headIndex].ackStatus == AckStatus.ACKED) {
setAckStatus(headIndex, AckStatus.UNSET);
headIndex = increaseIndex(headIndex);
offsetMappingHeadIndex++;
}
if (lowestCancelableOffset < offsetMappingHeadIndex) {
lowestCancelableOffset = offsetMappingHeadIndex;
}
updateLowestCancelableOffset(offsetMappingHeadIndex + 1);
notFull.signalAll();
scope
.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_COMMITTED)
.update(offsetMappingHeadIndex);
scope.gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED).update(size());
ret = offsetMappingHeadIndex;
} else {
// condition 6: this offset is in the middle of the ack queue
// action: just ack it
int index = (int) (offset - 1 - offsetMappingHeadIndex + headIndex) % capacity;
if (items[index].ackStatus == AckStatus.ACKED) {
ret = DUPLICATED_ACK;
} else {
setAckStatus(index, AckStatus.ACKED);
updateLowestCancelableOffset(offset);
if (highestAckedOffset < offset) {
highestAckedOffset = offset;
scope.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_ACKED).update(offset);
scope.gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED).update(size());
}
ret = IN_MEMORY_ACK_ONLY;
}
}
updateState();
return ret;
} finally {
lock.unlock();
}
}