private def createConsumerSettings()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala [55:85]


  private def createConsumerSettings(kafkaHost: String)(implicit actorSystem: ActorSystem) =
    ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaHost)
      .withGroupId(randomId())
      .withClientId(randomId())
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  private def createProducerSettings(
      kafkaHost: String)(implicit actorSystem: ActorSystem): ProducerSettings[Array[Byte], String] =
    ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(kafkaHost)

  def producerSink(c: RunTestCommand)(implicit actorSystem: ActorSystem) =
    FixtureGen[PekkoConnectorsCommittableSinkTestFixture[Message, ProducerMessage]](
      c,
      msgCount => {
        fillTopic(c.filledTopic, c.kafkaHost)
        val sinkTopic = randomId()

        val source: Source[Message, Control] =
          Consumer.committableSource(createConsumerSettings(c.kafkaHost), Subscriptions.topics(c.filledTopic.topic))

        val sink: Sink[ProducerMessage, Future[Done]] =
          Producer.committableSink(createProducerSettings(c.kafkaHost), CommitterSettings(actorSystem))

        PekkoConnectorsCommittableSinkTestFixture[Message, ProducerMessage](c.filledTopic.topic,
          sinkTopic,
          msgCount,
          source,
          sink)
      })