override protected def logic()

in core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala [110:137]


  override protected def logic(
      shape: SourceShape[(TopicPartition, Source[CommittableMessage[K, V], NotUsed])]): GraphStageLogic with Control = {

    val factory = new SubSourceStageLogicFactory[K, V, CommittableMessage[K, V]] {
      def create(
          shape: SourceShape[CommittableMessage[K, V]],
          tp: TopicPartition,
          consumerActor: ActorRef,
          subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
          subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
          actorNumber: Int): SubSourceStageLogic[K, V, CommittableMessage[K, V]] =
        new CommittableSubSourceStageLogic(shape,
          tp,
          consumerActor,
          subSourceStartedCb,
          subSourceCancelledCb,
          actorNumber,
          settings,
          _metadataFromRecord)

    }
    new SubSourceLogic[K, V, CommittableMessage[K, V]](shape,
      settings,
      subscription,
      getOffsetsOnAssign,
      onRevoke,
      subSourceStageLogicFactory = factory)
  }