def create()

in core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala [361:419]


    def create(
        shape: SourceShape[Msg],
        tp: TopicPartition,
        consumerActor: ActorRef,
        subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
        subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
        actorNumber: Int): SubSourceStageLogic[K, V, Msg]
  }
}

/**
 * Internal API
 *
 * A [[SubSourceStage]] is created per partition in [[SubSourceLogic]].
 */
@InternalApi
private final class SubSourceStage[K, V, Msg](
    tp: TopicPartition,
    consumerActor: ActorRef,
    subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
    subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
    actorNumber: Int,
    subSourceStageLogicFactory: SubSourceStageLogicFactory[K, V, Msg]) extends GraphStage[SourceShape[Msg]] { stage =>

  val out = Outlet[Msg]("out")
  val shape: SourceShape[Msg] = new SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    subSourceStageLogicFactory.create(shape, tp, consumerActor, subSourceStartedCb, subSourceCancelledCb, actorNumber)
}

/**
 * Internal API
 *
 * A [[SubSourceStageLogic]] is the [[GraphStageLogic]] of a [[SubSourceStage]].
 * This emits Kafka messages downstream (not sources).
 */
@InternalApi
private abstract class SubSourceStageLogic[K, V, Msg](
    val shape: SourceShape[Msg],
    tp: TopicPartition,
    consumerActor: ActorRef,
    subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
    subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
    actorNumber: Int) extends GraphStageLogic(shape)
    with PromiseControl
    with MetricsControl
    with StageIdLogging
    with MessageBuilder[K, V, Msg]
    with SourceLogicBuffer[K, V, Msg] {

  override def executionContext: ExecutionContext = materializer.executionContext
  override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor)
  override def id: String = s"${super.id}#$actorNumber"
  private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
  private var requested = false
  protected var subSourceActor: StageActor = _

  override def preStart(): Unit = {