in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala [52:81]
def initialize(c: RunTestCommand) =
FixtureGen[KafkaTransactionTestFixture](
c,
msgCount => {
fillTopic(c.filledTopic, c.kafkaHost)
val groupId = randomId()
val sinkTopic = randomId()
val consumerJavaProps = new java.util.Properties
consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
consumerJavaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
consumerJavaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerJavaProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH))
val consumer = new KafkaConsumer[Array[Byte], String](consumerJavaProps)
consumer.subscribe(Set(c.filledTopic.topic).asJava)
val producerJavaProps = new java.util.Properties
producerJavaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
producerJavaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
producerJavaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
producerJavaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true.toString)
producerJavaProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, randomId())
val producer = new KafkaProducer[Array[Byte], String](producerJavaProps)
KafkaTransactionTestFixture(c.filledTopic.topic, sinkTopic, msgCount, groupId, consumer, producer)
})