in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala [55:85]
private def createConsumerSettings(kafkaHost: String)(implicit actorSystem: ActorSystem) =
ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaHost)
.withGroupId(randomId())
.withClientId(randomId())
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
private def createProducerSettings(
kafkaHost: String)(implicit actorSystem: ActorSystem): ProducerSettings[Array[Byte], String] =
ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(kafkaHost)
def producerSink(c: RunTestCommand)(implicit actorSystem: ActorSystem) =
FixtureGen[PekkoConnectorsCommittableSinkTestFixture[Message, ProducerMessage]](
c,
msgCount => {
fillTopic(c.filledTopic, c.kafkaHost)
val sinkTopic = randomId()
val source: Source[Message, Control] =
Consumer.committableSource(createConsumerSettings(c.kafkaHost), Subscriptions.topics(c.filledTopic.topic))
val sink: Sink[ProducerMessage, Future[Done]] =
Producer.committableSink(createProducerSettings(c.kafkaHost), CommitterSettings(actorSystem))
PekkoConnectorsCommittableSinkTestFixture[Message, ProducerMessage](c.filledTopic.topic,
sinkTopic,
msgCount,
source,
sink)
})