uforwarder-container/src/integrationTest/java/com/uber/data/kafka/consumerproxy/container/UforwarderContainerIntegrationTests.java [221:295]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                        .setRpcTimeoutMs(1000)
                        .setProcedure(procedureName)
                        .setDlqCluster(TEST_CLUSTER_NAME)
                        .setDlqTopic(dlqTopicName)
                        .build())
                .setFlowControl(
                    FlowControl.newBuilder()
                        .setBytesPerSec(10)
                        .setMaxInflightMessages(10)
                        .setMessagesPerSec(10)
                        .build())
                .setRetryConfig(retryConfigBuilder)
                .build())
        .setJobGroupState(JobState.JOB_STATE_RUNNING)
        .build();
  }

  private void prepareTopic(String topicName, String bootstrapServers) {
    KafkaUtils.createTopic(topicName, 1, bootstrapServers);
  }

  private KafkaProducer<Byte[], Byte[]> prepareProducer(String bootstrapServers) {
    Properties producerProps = new Properties();
    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "test");
    producerProps.setProperty(
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.setProperty(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    return new KafkaProducer<>(producerProps);
  }

  private KafkaConsumer<Byte[], Byte[]> prepareConsumer(String bootstrapServers) {
    Properties consumerProps = new Properties();
    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "verify");

    consumerProps.setProperty(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.setProperty(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(consumerProps);
  }

  private void sendKafkaMessages(
      String topicName, String bootstrapServers, int numberOfMessages, boolean isEmptyMessages)
      throws ExecutionException, InterruptedException {
    KafkaProducer producer = prepareProducer(bootstrapServers);
    KafkaConsumer consumer = prepareConsumer(bootstrapServers);

    try {
      // send sync messages
      for (int index = 0; index < numberOfMessages; index++) {
        ProducerRecord record =
            new ProducerRecord(topicName, String.format("test message %d", index).getBytes());
        if (isEmptyMessages) {
          record = new ProducerRecord(topicName, null);
        }
        producer.send(record).get();
      }
      consumer.subscribe(ImmutableList.of(topicName));
      await()
          .atMost(MAX_AWAIT_TIME_IN_SEC, TimeUnit.SECONDS)
          .untilAsserted(
              () -> {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
                System.out.println("found num record:" + records != null ? records.count() : -1);
                Assert.assertTrue(records != null && records.count() != 0);
              });
    } finally {
      producer.close();
      consumer.close();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [388:462]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                        .setRpcTimeoutMs(1000)
                        .setProcedure(procedureName)
                        .setDlqCluster(TEST_CLUSTER_NAME)
                        .setDlqTopic(dlqTopicName)
                        .build())
                .setFlowControl(
                    FlowControl.newBuilder()
                        .setBytesPerSec(10)
                        .setMaxInflightMessages(10)
                        .setMessagesPerSec(10)
                        .build())
                .setRetryConfig(retryConfigBuilder)
                .build())
        .setJobGroupState(JobState.JOB_STATE_RUNNING)
        .build();
  }

  private void prepareTopic(String topicName, String bootstrapServers) {
    KafkaUtils.createTopic(topicName, 1, bootstrapServers);
  }

  private KafkaProducer<Byte[], Byte[]> prepareProducer(String bootstrapServers) {
    Properties producerProps = new Properties();
    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "test");
    producerProps.setProperty(
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.setProperty(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    return new KafkaProducer<>(producerProps);
  }

  private KafkaConsumer<Byte[], Byte[]> prepareConsumer(String bootstrapServers) {
    Properties consumerProps = new Properties();
    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "verify");

    consumerProps.setProperty(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.setProperty(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(consumerProps);
  }

  private void sendKafkaMessages(
      String topicName, String bootstrapServers, int numberOfMessages, boolean isEmptyMessages)
      throws ExecutionException, InterruptedException {
    KafkaProducer producer = prepareProducer(bootstrapServers);
    KafkaConsumer consumer = prepareConsumer(bootstrapServers);

    try {
      // send sync messages
      for (int index = 0; index < numberOfMessages; index++) {
        ProducerRecord record =
            new ProducerRecord(topicName, String.format("test message %d", index).getBytes());
        if (isEmptyMessages) {
          record = new ProducerRecord(topicName, null);
        }
        producer.send(record).get();
      }
      consumer.subscribe(ImmutableList.of(topicName));
      await()
          .atMost(MAX_AWAIT_TIME_IN_SEC, TimeUnit.SECONDS)
          .untilAsserted(
              () -> {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
                System.out.println("found num record:" + records != null ? records.count() : -1);
                Assert.assertTrue(records != null && records.count() != 0);
              });
    } finally {
      producer.close();
      consumer.close();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



