private def createConsumerSettings()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala [51:82]


  private def createConsumerSettings(kafkaHost: String)(implicit actorSystem: ActorSystem) =
    ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaHost)
      .withGroupId(randomId())
      .withClientId(randomId())
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  private def createProducerSettings(
      kafkaHost: String)(implicit actorSystem: ActorSystem): ProducerSettings[Array[Byte], String] =
    ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(kafkaHost)

  def transactionalSourceAndSink(c: RunTestCommand, commitInterval: FiniteDuration)(implicit actorSystem: ActorSystem) =
    FixtureGen[ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult]](
      c,
      msgCount => {
        fillTopic(c.filledTopic, c.kafkaHost)
        val sinkTopic = randomId()

        val consumerSettings = createConsumerSettings(c.kafkaHost)
        val source: Source[KTransactionMessage, Control] =
          Transactional.source(consumerSettings, Subscriptions.topics(c.filledTopic.topic))

        val producerSettings = createProducerSettings(c.kafkaHost).withEosCommitInterval(commitInterval)
        val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings, randomId())

        ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult](c.filledTopic.topic,
          sinkTopic,
          msgCount,
          source,
          flow)
      })