gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java [201:295]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        } catch (Throwable t) {
          throw Throwables.propagate(t);
        }
      });
    } catch (Exception e) {
      log.error("Exception on polling records", e);
      throw new RuntimeException(e);
    }
  }

  /**
   * Subscribe to a kafka topic
   * TODO Add multi topic support
   * @param topic
   */
  @Override
  public void subscribe(String topic) {
    this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
  }

  /**
   * Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
   * TODO Add multi topic support
   * @param topic
   */
  @Override
  public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
    this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
      }
    });
  }

  @Override
  public Map<String, Metric> getMetrics() {
    Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
    Map<String, Metric> codaHaleMetricMap = new HashMap<>();

    kafkaMetrics
        .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
    return codaHaleMetricMap;
  }

  /**
   * Commit offsets to Kafka asynchronously
   */
  @Override
  public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
    consumer.commitAsync(offsets, new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        if(exception != null) {
          log.error("Exception while committing offsets " + offsets, exception);
          return;
        }
      }
    });
  }

  /**
   * Commit offsets to Kafka synchronously
   */
  @Override
  public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
    consumer.commitSync(offsets);
  }

  /**
   * returns the last committed offset for a KafkaPartition
   * @param partition
   * @return last committed offset or -1 for invalid KafkaPartition
   */
  @Override
  public long committed(KafkaPartition partition) {
    OffsetAndMetadata offsetAndMetadata =  consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
    return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
  }

  /**
   * Convert a {@link KafkaMetric} instance to a {@link Metric}.
   * @param kafkaMetric
   * @return
   */
  private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
    if (log.isDebugEnabled()) {
      log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java [169:263]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        } catch (Throwable t) {
          throw Throwables.propagate(t);
        }
      });
    } catch (Exception e) {
      log.error("Exception on polling records", e);
      throw new RuntimeException(e);
    }
  }

  /**
   * Subscribe to a kafka topic
   * TODO Add multi topic support
   * @param topic
   */
  @Override
  public void subscribe(String topic) {
    this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
  }

  /**
   * Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
   * TODO Add multi topic support
   * @param topic
   */
  @Override
  public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
    this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
      }
    });
  }

  @Override
  public Map<String, Metric> getMetrics() {
    Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
    Map<String, Metric> codaHaleMetricMap = new HashMap<>();

    kafkaMetrics
        .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
    return codaHaleMetricMap;
  }

  /**
   * Commit offsets to Kafka asynchronously
   */
  @Override
  public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
    consumer.commitAsync(offsets, new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        if(exception != null) {
          log.error("Exception while committing offsets " + offsets, exception);
          return;
        }
      }
    });
  }

  /**
   * Commit offsets to Kafka synchronously
   */
  @Override
  public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
    consumer.commitSync(offsets);
  }

  /**
   * returns the last committed offset for a KafkaPartition
   * @param partition
   * @return last committed offset or -1 for invalid KafkaPartition
   */
  @Override
  public long committed(KafkaPartition partition) {
    OffsetAndMetadata offsetAndMetadata =  consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
    return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
  }

  /**
   * Convert a {@link KafkaMetric} instance to a {@link Metric}.
   * @param kafkaMetric
   * @return
   */
  private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
    if (log.isDebugEnabled()) {
      log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



