public abstract void receive()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/AbstractAckTrackingQueue.java [129:194]


  public abstract void receive(long offset, Map<AttributeKey, Attribute> attributes)
      throws InterruptedException;

  /**
   * tests if AckTrackingQueue is full
   *
   * @param offset the offset to mark
   * @return the boolean
   */
  protected abstract boolean isFull(long offset);

  /**
   * Try to wait for the queue to be not full.
   *
   * @param offsetToMark the offset to be marked
   * @return false when the wait fails, and the caller should return; true when the wait succeeds,
   *     and the caller should proceed to do other work
   * @throws InterruptedException when the current thread is waiting for the signal, it may be
   *     interrupted.
   */
  protected boolean waitForNotFull(long offsetToMark) throws InterruptedException {
    // Check whether the ack tracking is marked as not in use or not.
    // If it's not in use any more, there is no need to wait for the ack tracking is not full,
    // simply tell the caller that it cannot acquire the lock.
    if (notInUse) {
      return false;
    }
    Stopwatch stopwatch = null;
    lock.lock();
    try {
      while (isFull(offsetToMark)) {
        if (notInUse) {
          if (stopwatch != null) {
            stopwatch.stop();
          }
          logger.info(
              "an ack tracking queue is not in use now",
              StructuredLogging.jobId(job.getJobId()),
              StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
              StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
              StructuredLogging.kafkaTopic(job.getKafkaConsumerTask().getTopic()),
              StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()));
          return false;
        } else {
          if (stopwatch == null) {
            stopwatch =
                scope.timer(AckTrackingQueue.MetricNames.STATUS_CHANGE_WAITING_TIME).start();
          }
          notFull.await();
        }
      }
      if (stopwatch != null) {
        stopwatch.stop();
      }
      logger.debug(
          "an ack tracking queue is not full now",
          StructuredLogging.jobId(job.getJobId()),
          StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
          StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
          StructuredLogging.kafkaTopic(job.getKafkaConsumerTask().getTopic()),
          StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()));
      return true;
    } finally {
      lock.unlock();
    }
  }