def consumeTransformProduceTransaction()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala [34:99]


  def consumeTransformProduceTransaction(commitInterval: FiniteDuration)(fixture: KafkaTransactionTestFixture,
      meter: Meter): Unit = {
    val consumer = fixture.consumer
    val producer = fixture.producer
    val msgCount = fixture.msgCount
    val logPercentStep = 1
    val loggedStep = if (msgCount > logPercentStep) 100 else 1

    logger.debug(s"Transaction commit interval: ${commitInterval.toMillis}ms")

    var lastProcessedOffset = 0L
    var accumulatedMsgCount = 0L
    var lastCommit = 0L

    def doCommit(): Unit = {
      accumulatedMsgCount = 0
      val offsetMap = Map(new TopicPartition(fixture.sourceTopic, 0) -> new OffsetAndMetadata(lastProcessedOffset))
      logger.debug("Committing offset " + offsetMap.head._2.offset())
      producer.sendOffsetsToTransaction(offsetMap.asJava, new ConsumerGroupMetadata(fixture.groupId))
      producer.commitTransaction()
    }

    def beginTransaction(): Unit = {
      logger.debug("Beginning transaction")
      lastCommit = System.nanoTime()
      producer.beginTransaction()
    }

    @tailrec
    def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
      if (readSoFar >= readLimit) {
        doCommit()
        readSoFar
      } else {
        logger.debug("Polling")
        val records = consumer.poll(pollTimeoutMs)
        for (record <- records.iterator().asScala) {
          accumulatedMsgCount = accumulatedMsgCount + 1
          lastProcessedOffset = record.offset()

          val producerRecord = new ProducerRecord(fixture.sinkTopic, record.partition(), record.key(), record.value())
          producer.send(producerRecord,
            new Callback {
              override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = meter.mark()
            })
          if (lastProcessedOffset % loggedStep == 0)
            logger.info(
              s"Transformed $lastProcessedOffset elements to Kafka (${100 * lastProcessedOffset / msgCount}%)")

          if (System.nanoTime() >= lastCommit + commitInterval.toNanos) {
            doCommit()
            beginTransaction()
          }
        }
        val recordCount = records.count()
        logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
        pollInLoop(readLimit, readSoFar + recordCount)
      }

    meter.mark()
    logger.debug("Initializing transactions")
    producer.initTransactions()
    beginTransaction()
    pollInLoop(readLimit = fixture.msgCount)
    fixture.close()
  }