public long ack()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ArrayAckTrackingQueue.java [122:174]


  public long ack(long offset) throws InterruptedException {
    lock.lock();
    try {
      if (!validateOffset(offset)) {
        return CANNOT_ACK;
      }

      scope.gauge(MetricNames.IN_MEMORY_CAPACITY).update(capacity);
      // condition 5: this offset is the head of the ack queue
      // action:
      // (1) move head to the next un-acked offset
      // (2) adjust offsetMappingHeadIndex
      long ret;
      if (offset == offsetMappingHeadIndex + 1) {
        setAckStatus(headIndex, AckStatus.ACKED);
        while (items[headIndex].ackStatus == AckStatus.ACKED) {
          setAckStatus(headIndex, AckStatus.UNSET);
          headIndex = increaseIndex(headIndex);
          offsetMappingHeadIndex++;
        }
        if (lowestCancelableOffset < offsetMappingHeadIndex) {
          lowestCancelableOffset = offsetMappingHeadIndex;
        }
        updateLowestCancelableOffset(offsetMappingHeadIndex + 1);
        notFull.signalAll();
        scope
            .gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_COMMITTED)
            .update(offsetMappingHeadIndex);
        scope.gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED).update(size());
        ret = offsetMappingHeadIndex;
      } else {
        // condition 6: this offset is in the middle of the ack queue
        // action: just ack it
        int index = (int) (offset - 1 - offsetMappingHeadIndex + headIndex) % capacity;
        if (items[index].ackStatus == AckStatus.ACKED) {
          ret = DUPLICATED_ACK;
        } else {
          setAckStatus(index, AckStatus.ACKED);
          updateLowestCancelableOffset(offset);
          if (highestAckedOffset < offset) {
            highestAckedOffset = offset;
            scope.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_ACKED).update(offset);
            scope.gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED).update(size());
          }
          ret = IN_MEMORY_ACK_ONLY;
        }
      }
      updateState();
      return ret;
    } finally {
      lock.unlock();
    }
  }