in core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala [46:106]
def source[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[TransactionalMessage[K, V], Control] =
Source.fromGraph(new TransactionalSource[K, V](settings, subscription))
/**
* API MAY CHANGE
*
* This source is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Transactional.flowWithOffsetContext]].
*/
@ApiMayChange
def sourceWithOffsetContext[K, V](
settings: ConsumerSettings[K, V],
subscription: Subscription): SourceWithContext[ConsumerRecord[K, V], PartitionOffset, Control] =
Source
.fromGraph(new TransactionalSourceWithOffsetContext[K, V](settings, subscription))
.asSourceWithContext(_._2)
.map(_._1)
/**
* Internal API. Work in progress.
*
* The `partitionedSource` is a way to track automatic partition assignment from kafka.
* Each source is setup for for Exactly Only Once (EoS) kafka message semantics.
* To enable EoS it's necessary to use the [[Transactional.sink]] or [[Transactional.flow]] (for passthrough).
* When Kafka rebalances partitions, all sources complete before the remaining sources are issued again.
*
* By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run
* without having to manually assign partitions to each instance.
*/
@ApiMayChange
@InternalApi
private[kafka] def partitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription): Source[(TopicPartition, Source[TransactionalMessage[K, V], NotUsed]), Control] =
Source.fromGraph(new TransactionalSubSource[K, V](settings, subscription))
/**
* Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will
* initialize, begin, produce, and commit the consumer offset as part of a transaction.
*/
def sink[K, V](
settings: ProducerSettings[K, V],
transactionalId: String): Sink[Envelope[K, V, ConsumerMessage.PartitionOffset], Future[Done]] =
flow(settings, transactionalId).toMat(Sink.ignore)(Keep.right)
/**
* API MAY CHANGE
*
* Sink that requires the context to be [[ConsumerMessage.PartitionOffset]] from a [[Transactional.sourceWithOffsetContext]].
* It will initialize, begin, produce, and commit the consumer offset as part of a transaction.
*/
@ApiMayChange
def sinkWithOffsetContext[K, V](
settings: ProducerSettings[K, V],
transactionalId: String): Sink[(Envelope[K, V, NotUsed], PartitionOffset), Future[Done]] =
sink(settings, transactionalId)
.contramap {
case (env, offset) =>
env.withPassThrough(offset)
}