def initialize()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala [52:81]


  def initialize(c: RunTestCommand) =
    FixtureGen[KafkaTransactionTestFixture](
      c,
      msgCount => {
        fillTopic(c.filledTopic, c.kafkaHost)
        val groupId = randomId()
        val sinkTopic = randomId()

        val consumerJavaProps = new java.util.Properties
        consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
        consumerJavaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
        consumerJavaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
        consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
        consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        consumerJavaProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
          IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH))
        val consumer = new KafkaConsumer[Array[Byte], String](consumerJavaProps)
        consumer.subscribe(Set(c.filledTopic.topic).asJava)

        val producerJavaProps = new java.util.Properties
        producerJavaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
        producerJavaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
        producerJavaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
        producerJavaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true.toString)
        producerJavaProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, randomId())
        val producer = new KafkaProducer[Array[Byte], String](producerJavaProps)

        KafkaTransactionTestFixture(c.filledTopic.topic, sinkTopic, msgCount, groupId, consumer, producer)
      })