public long ack()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/LinkedAckTrackingQueue.java [90:152]


  public long ack(long offset) {
    if (!validateOffset(offset)) {
      return CANNOT_ACK;
    }

    scope.gauge(MetricNames.IN_MEMORY_CAPACITY).update(capacity);
    lock.lock();
    try {
      // condition 3: has never received the offset before
      final OffsetStatus curStatus = offsetStatusMap.get(offset - 1);
      if (curStatus == null) {
        return CANNOT_ACK;
      }

      // condition 4: the offset has been acked before
      if (curStatus.ackStatus == AckStatus.ACKED) {
        return DUPLICATED_ACK;
      }

      updateOffsetStatus(curStatus, AckStatus.ACKED);
      if (curStatus.ackStatus != AckStatus.CANCELED) {
        cancelableOffsets.remove(curStatus.offset);
      }
      // if head of the queue is in acked status, the highest acked offset need to commit to broker
      // otherwise, just ack in memory
      Iterator<OffsetStatus> iter = offsetStatusMap.values().iterator();
      OffsetStatus offsetStatus = iter.next();
      long ret;
      if (offsetStatus.ackStatus == AckStatus.ACKED) {
        // purge all acked offsets from head of queue and update highestCommittedOffset
        while (offsetStatus != null && offsetStatus.ackStatus == AckStatus.ACKED) {
          highestCommittedOffset = offsetStatus.offset + 1;
          iter.remove();
          offsetStatus.close();
          offsetStatus = iter.hasNext() ? iter.next() : null;
        }
        // with empty queue make value of headOffset consist with ArrayTrackingQueue
        headOffset = offsetStatus != null ? offsetStatus.offset : highestCommittedOffset;
        notFull.signalAll();
        scope
            .gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_COMMITTED)
            .update(highestCommittedOffset);
        scope
            .gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED)
            .update(offsetStatusMap.size());
        ret = highestCommittedOffset;
      } else {
        // ack in memory
        if (highestAckedOffset < offset) {
          highestAckedOffset = offset;
          scope.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_ACKED).update(offset);
          scope
              .gauge(AckTrackingQueue.MetricNames.IN_MEMORY_UNCOMMITTED)
              .update(offsetStatusMap.size());
        }
        ret = IN_MEMORY_ACK_ONLY;
      }
      updateState();
      return ret;
    } finally {
      lock.unlock();
    }
  }