override def onMessage()

in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala [151:182]


  override def onMessage(rec: ConsumerRecord[K, V]): Unit =
    inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) -> rec.offset()))

  override protected def stopConsumerActor(): Unit =
    sourceActor.ref
      .tell(Drain(
          inFlightRecords.assigned(),
          Some(consumerActor),
          KafkaConsumerActor.Internal.StopFromStage(id)),
        sourceActor.ref)

  override protected def addToPartitionAssignmentHandler(
      handler: PartitionAssignmentHandler): PartitionAssignmentHandler = {
    val blockingRevokedCall = new PartitionAssignmentHandler {
      override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()

      // This is invoked in the KafkaConsumerActor thread when doing poll.
      override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
        if (waitForDraining(revokedTps)) {
          sourceActor.ref.tell(Revoked(revokedTps.toList), consumerActor)
        } else {
          sourceActor.ref.tell(Failure(new Error("Timeout while draining")), consumerActor)
          consumerActor.tell(KafkaConsumerActor.Internal.StopFromStage(id), consumerActor)
        }

      override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
        onRevoke(lostTps, consumer)

      override def onStop(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()
    }
    new PartitionAssignmentHelpers.Chain(handler, blockingRevokedCall)
  }