in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala [337:398]
override def revoke(revokedTps: Set[TopicPartition]): Unit =
inFlightRecords = inFlightRecords -- revokedTps
override def reset(): Unit = inFlightRecords = Map.empty
override def empty(partitions: Set[TopicPartition]): Boolean = partitions.flatMap(inFlightRecords.get(_)).isEmpty
override def toString: String = inFlightRecords.toString()
override def assigned(): Set[TopicPartition] = inFlightRecords.keySet
}
}
}
@InternalApi
private final class TransactionalSubSourceStageLogic[K, V](
shape: SourceShape[TransactionalMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V]) extends SubSourceStageLogic[K, V, TransactionalMessage[K, V]](shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber)
with TransactionalMessageBuilder[K, V] {
import TransactionalSourceLogic._
private val inFlightRecords = InFlightRecords.empty
override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
override def onMessage(rec: ConsumerRecord[K, V]): Unit =
inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) -> rec.offset()))
override val fromPartitionedSource: Boolean = true
override protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] =
super.messageHandling.orElse(drainHandling).orElse {
case (_, Revoked(tps)) =>
inFlightRecords.revoke(tps.toSet)
}
override protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy = DoNothing
private def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] =
drainHandling
.orElse {
case (_, Status.Failure(e)) =>
failStage(e)
case (_, Terminated(ref)) if ref == consumerActor =>
failStage(new ConsumerFailed())
}
override def performShutdown(): Unit = {
log.debug("#{} Completing SubSource for partition {}", actorNumber, tp)
setKeepGoing(true)
if (!isClosed(shape.out)) {