gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java [157:187]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    } finally {
      if (consumer != null) {
        consumer.close();
      }
    }
    return null;
  }

  private SimpleConsumer getSimpleConsumer(String broker) {
    if (this.activeConsumers.containsKey(broker)) {
      return this.activeConsumers.get(broker);
    }
    SimpleConsumer consumer = this.createSimpleConsumer(broker);
    this.activeConsumers.putIfAbsent(broker, consumer);
    return consumer;
  }

  private SimpleConsumer getSimpleConsumer(HostAndPort hostAndPort) {
    return this.getSimpleConsumer(hostAndPort.toString());
  }

  private SimpleConsumer createSimpleConsumer(String broker) {
    List<String> hostPort = Splitter.on(':').trimResults().omitEmptyStrings().splitToList(broker);
    return createSimpleConsumer(hostPort.get(0), Integer.parseInt(hostPort.get(1)));
  }

  private SimpleConsumer createSimpleConsumer(String host, int port) {
    return new SimpleConsumer(host, port, this.socketTimeoutMillis, this.bufferSize, this.clientName);
  }

  @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaWrapper.java [329:359]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
      } finally {
        if (consumer != null) {
          consumer.close();
        }
      }
      return null;
    }

    private SimpleConsumer getSimpleConsumer(String broker) {
      if (this.activeConsumers.containsKey(broker)) {
        return this.activeConsumers.get(broker);
      }
      SimpleConsumer consumer = this.createSimpleConsumer(broker);
      this.activeConsumers.putIfAbsent(broker, consumer);
      return consumer;
    }

    private SimpleConsumer getSimpleConsumer(HostAndPort hostAndPort) {
      return this.getSimpleConsumer(hostAndPort.toString());
    }

    private SimpleConsumer createSimpleConsumer(String broker) {
      List<String> hostPort = Splitter.on(':').trimResults().omitEmptyStrings().splitToList(broker);
      return createSimpleConsumer(hostPort.get(0), Integer.parseInt(hostPort.get(1)));
    }

    private SimpleConsumer createSimpleConsumer(String host, int port) {
      return new SimpleConsumer(host, port, this.socketTimeoutMillis, this.bufferSize, this.clientName);
    }

    @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



