in core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala [110:137]
override protected def logic(
shape: SourceShape[(TopicPartition, Source[CommittableMessage[K, V], NotUsed])]): GraphStageLogic with Control = {
val factory = new SubSourceStageLogicFactory[K, V, CommittableMessage[K, V]] {
def create(
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[SubSourceStageLogicControl],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int): SubSourceStageLogic[K, V, CommittableMessage[K, V]] =
new CommittableSubSourceStageLogic(shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber,
settings,
_metadataFromRecord)
}
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape,
settings,
subscription,
getOffsetsOnAssign,
onRevoke,
subSourceStageLogicFactory = factory)
}