in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala [51:82]
private def createConsumerSettings(kafkaHost: String)(implicit actorSystem: ActorSystem) =
ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaHost)
.withGroupId(randomId())
.withClientId(randomId())
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
private def createProducerSettings(
kafkaHost: String)(implicit actorSystem: ActorSystem): ProducerSettings[Array[Byte], String] =
ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(kafkaHost)
def transactionalSourceAndSink(c: RunTestCommand, commitInterval: FiniteDuration)(implicit actorSystem: ActorSystem) =
FixtureGen[ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult]](
c,
msgCount => {
fillTopic(c.filledTopic, c.kafkaHost)
val sinkTopic = randomId()
val consumerSettings = createConsumerSettings(c.kafkaHost)
val source: Source[KTransactionMessage, Control] =
Transactional.source(consumerSettings, Subscriptions.topics(c.filledTopic.topic))
val producerSettings = createProducerSettings(c.kafkaHost).withEosCommitInterval(commitInterval)
val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings, randomId())
ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult](c.filledTopic.topic,
sinkTopic,
msgCount,
source,
flow)
})