in core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala [361:419]
def create(
shape: SourceShape[Msg],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int): SubSourceStageLogic[K, V, Msg]
}
}
/**
* Internal API
*
* A [[SubSourceStage]] is created per partition in [[SubSourceLogic]].
*/
@InternalApi
private final class SubSourceStage[K, V, Msg](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int,
subSourceStageLogicFactory: SubSourceStageLogicFactory[K, V, Msg]) extends GraphStage[SourceShape[Msg]] { stage =>
val out = Outlet[Msg]("out")
val shape: SourceShape[Msg] = new SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
subSourceStageLogicFactory.create(shape, tp, consumerActor, subSourceStartedCb, subSourceCancelledCb, actorNumber)
}
/**
* Internal API
*
* A [[SubSourceStageLogic]] is the [[GraphStageLogic]] of a [[SubSourceStage]].
* This emits Kafka messages downstream (not sources).
*/
@InternalApi
private abstract class SubSourceStageLogic[K, V, Msg](
val shape: SourceShape[Msg],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int) extends GraphStageLogic(shape)
with PromiseControl
with MetricsControl
with StageIdLogging
with MessageBuilder[K, V, Msg]
with SourceLogicBuffer[K, V, Msg] {
override def executionContext: ExecutionContext = materializer.executionContext
override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor)
override def id: String = s"${super.id}#$actorNumber"
private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
private var requested = false
protected var subSourceActor: StageActor = _
override def preStart(): Unit = {