public synchronized void receive()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ArrayAckTrackingQueue.java [70:119]


  public synchronized void receive(long offset, Map<AttributeKey, Attribute> attributes)
      throws InterruptedException {
    lock.lock();
    try {
      if (highestReceivedOffset != INITIAL_OFFSET) {
        if (offset - highestReceivedOffset != 1) {
          scope
              .gauge(AckTrackingQueue.MetricNames.OFFSET_RECEIVED_GAP)
              .update(offset - highestReceivedOffset);
          LOGGER.warn(
              "gaps between received offsets",
              StructuredLogging.offsetGap(offset - highestReceivedOffset));
        }
      }
      if (highestReceivedOffset < offset) {
        scope.gauge(AckTrackingQueue.MetricNames.HIGHEST_OFFSET_RECEIVED).update(offset);
        // the first time to receive
        if (highestReceivedOffset == INITIAL_OFFSET) {
          this.offsetMappingHeadIndex = offset;
          this.lowestCancelableOffset = offset;
          this.highestReceivedOffset = offset;
          this.highestAckedOffset = offset;
        } else if (offset - highestReceivedOffset != 1) {
          // offsets should be received in order.
          // If a new offset is too large, it means that some offsets are purged from Kafka servers,
          // so we need to reset the ack tracking queue.
          this.offsetMappingHeadIndex = offset;
          this.lowestCancelableOffset = offset;
          this.highestReceivedOffset = offset;
          this.highestAckedOffset = offset;
          reset();
          notFull.signalAll();
        }
        // if we cannot mark more offsets as received
        // action:  wait until this offset can be received (then proceed) or this queue is not in
        // use
        // anymore (then return)
        if (waitForNotFull(offset + 1)) {
          OffsetStatus status =
              items[(int) (offset - offsetMappingHeadIndex + headIndex) % capacity];
          status.attributes = attributes;
          onReceive(attributes);
          highestReceivedOffset = offset;
        }
        updateState();
      }
    } finally {
      lock.unlock();
    }
  }