uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [435:464]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void sendKafkaMessages(
      String topicName, String bootstrapServers, int numberOfMessages, boolean isEmptyMessages)
      throws ExecutionException, InterruptedException {
    KafkaProducer producer = prepareProducer(bootstrapServers);
    KafkaConsumer consumer = prepareConsumer(bootstrapServers);

    try {
      // send sync messages
      for (int index = 0; index < numberOfMessages; index++) {
        ProducerRecord record =
            new ProducerRecord(topicName, String.format("test message %d", index).getBytes());
        if (isEmptyMessages) {
          record = new ProducerRecord(topicName, null);
        }
        producer.send(record).get();
      }
      consumer.subscribe(ImmutableList.of(topicName));
      await()
          .atMost(MAX_AWAIT_TIME_IN_SEC, TimeUnit.SECONDS)
          .untilAsserted(
              () -> {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
                System.out.println("found num record:" + records != null ? records.count() : -1);
                Assert.assertTrue(records != null && records.count() != 0);
              });
    } finally {
      producer.close();
      consumer.close();
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



uforwarder-container/src/integrationTest/java/com/uber/data/kafka/consumerproxy/container/UforwarderContainerIntegrationTests.java [268:297]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void sendKafkaMessages(
      String topicName, String bootstrapServers, int numberOfMessages, boolean isEmptyMessages)
      throws ExecutionException, InterruptedException {
    KafkaProducer producer = prepareProducer(bootstrapServers);
    KafkaConsumer consumer = prepareConsumer(bootstrapServers);

    try {
      // send sync messages
      for (int index = 0; index < numberOfMessages; index++) {
        ProducerRecord record =
            new ProducerRecord(topicName, String.format("test message %d", index).getBytes());
        if (isEmptyMessages) {
          record = new ProducerRecord(topicName, null);
        }
        producer.send(record).get();
      }
      consumer.subscribe(ImmutableList.of(topicName));
      await()
          .atMost(MAX_AWAIT_TIME_IN_SEC, TimeUnit.SECONDS)
          .untilAsserted(
              () -> {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
                System.out.println("found num record:" + records != null ? records.count() : -1);
                Assert.assertTrue(records != null && records.count() != 0);
              });
    } finally {
      producer.close();
      consumer.close();
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



