override def revoke()

in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala [337:398]


      override def revoke(revokedTps: Set[TopicPartition]): Unit =
        inFlightRecords = inFlightRecords -- revokedTps

      override def reset(): Unit = inFlightRecords = Map.empty

      override def empty(partitions: Set[TopicPartition]): Boolean = partitions.flatMap(inFlightRecords.get(_)).isEmpty

      override def toString: String = inFlightRecords.toString()

      override def assigned(): Set[TopicPartition] = inFlightRecords.keySet
    }
  }
}

@InternalApi
private final class TransactionalSubSourceStageLogic[K, V](
    shape: SourceShape[TransactionalMessage[K, V]],
    tp: TopicPartition,
    consumerActor: ActorRef,
    subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
    subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
    actorNumber: Int,
    consumerSettings: ConsumerSettings[K, V]) extends SubSourceStageLogic[K, V, TransactionalMessage[K, V]](shape,
      tp,
      consumerActor,
      subSourceStartedCb,
      subSourceCancelledCb,
      actorNumber)
    with TransactionalMessageBuilder[K, V] {

  import TransactionalSourceLogic._

  private val inFlightRecords = InFlightRecords.empty

  override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)

  override def onMessage(rec: ConsumerRecord[K, V]): Unit =
    inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) -> rec.offset()))

  override val fromPartitionedSource: Boolean = true

  override protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] =
    super.messageHandling.orElse(drainHandling).orElse {
      case (_, Revoked(tps)) =>
        inFlightRecords.revoke(tps.toSet)
    }

  override protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy = DoNothing

  private def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] =
    drainHandling
      .orElse {
        case (_, Status.Failure(e)) =>
          failStage(e)
        case (_, Terminated(ref)) if ref == consumerActor =>
          failStage(new ConsumerFailed())
      }

  override def performShutdown(): Unit = {
    log.debug("#{} Completing SubSource for partition {}", actorNumber, tp)
    setKeepGoing(true)
    if (!isClosed(shape.out)) {