in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/LinkedAckTrackingQueue.java [90:152]
public long ack(long offset) {
if (!validateOffset(offset)) {
return CANNOT_ACK;
}
scope.gauge(MetricNames.IN_MEMORY_CAPACITY).update(capacity);
lock.lock();
try {
// condition 3: has never received the offset before
final OffsetStatus curStatus = offsetStatusMap.get(offset - 1);
if (curStatus == null) {
return CANNOT_ACK;
}
// condition 4: the offset has been acked before
if (curStatus.ackStatus == AckStatus.ACKED) {
return DUPLICATED_ACK;
}
updateOffsetStatus(curStatus, AckStatus.ACKED);
if (curStatus.ackStatus != AckStatus.CANCELED) {
cancelableOffsets.remove(curStatus.offset);
}
// if head of the queue is in acked status, the highest acked offset need to commit to broker
// otherwise, just ack in memory
Iterator<OffsetStatus> iter = offsetStatusMap.values().iterator();
OffsetStatus offsetStatus = iter.next();
long ret;
if (offsetStatus.ackStatus == AckStatus.ACKED) {
// purge all acked offsets from head of queue and update highestCommittedOffset
while (offsetStatus != null && offsetStatus.ackStatus == AckStatus.ACKED) {
highestCommittedOffset = offsetStatus.offset + 1;
iter.remove();
offsetStatus.close();
offsetStatus = iter.hasNext() ? iter.next() : null;
}
// with empty queue make value of headOffset consist with ArrayTrackingQueue
headOffset = offsetStatus != null ? offsetStatus.offset : highestCommittedOffset;
notFull.signalAll();
scope
.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_COMMITTED)
.update(highestCommittedOffset);
scope
.gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED)
.update(offsetStatusMap.size());
ret = highestCommittedOffset;
} else {
// ack in memory
if (highestAckedOffset < offset) {
highestAckedOffset = offset;
scope.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_ACKED).update(offset);
scope
.gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED)
.update(offsetStatusMap.size());
}
ret = IN_MEMORY_ACK_ONLY;
}
updateState();
return ret;
} finally {
lock.unlock();
}
}