private long getLastOffset()

in twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java [147:181]


  private long getLastOffset(TopicPartition topicPart, long timestamp) {
    BrokerInfo brokerInfo = brokerService.getLeader(topicPart.getTopic(), topicPart.getPartition());
    SimpleConsumer consumer = brokerInfo == null ? null : consumers.getUnchecked(brokerInfo);

    // If no broker, treat it as failure attempt.
    if (consumer == null) {
      LOG.warn("Failed to talk to any broker. Default offset to 0 for {}", topicPart);
      return 0L;
    }

    // Fire offset request
    OffsetRequest request = new OffsetRequest(ImmutableMap.of(
      new TopicAndPartition(topicPart.getTopic(), topicPart.getPartition()),
      new PartitionOffsetRequestInfo(timestamp, 1)
    ), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());

    OffsetResponse response = consumer.getOffsetsBefore(request);

    // Retrieve offsets from response
    long[] offsets = response.hasError() ? null : response.offsets(topicPart.getTopic(), topicPart.getPartition());
    if (offsets == null || offsets.length <= 0) {
      short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition());

      // If the topic partition doesn't exists, use offset 0 without logging error.
      if (errorCode != ErrorMapping.UnknownTopicOrPartitionCode()) {
        consumers.refresh(brokerInfo);
        LOG.warn("Failed to fetch offset for {} with timestamp {}. Error: {}. Default offset to 0.",
                 topicPart, timestamp, errorCode);
      }
      return 0L;
    }

    LOG.debug("Offset {} fetched for {} with timestamp {}.", offsets[0], topicPart, timestamp);
    return offsets[0];
  }