in core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala [41:154]
def plainSink[K, V](settings: ProducerSettings[K, V]): Sink[ProducerRecord[K, V], Future[Done]] =
Flow[ProducerRecord[K, V]]
.map(Message(_, NotUsed))
.via(flexiFlow(settings))
.toMat(Sink.ignore)(Keep.right)
/**
* Create a sink for publishing records to Kafka topics.
*
* The [[org.apache.kafka.clients.producer.ProducerRecord Kafka ProducerRecord]] contains the topic name to which the record is being sent, an optional
* partition number, and an optional key and value.
*
* Supports sharing a Kafka Producer instance.
*/
@deprecated(
"Pass in external or shared producer using ProducerSettings.withProducerFactory or ProducerSettings.withProducer",
"Alpakka Kafka 2.0.0")
def plainSink[K, V](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V]): Sink[ProducerRecord[K, V], Future[Done]] =
plainSink(settings.withProducer(producer))
/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
* from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
* been published successfully to the topic.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
*
* Note that there is a risk that something fails after publishing but before
* committing, so it is "at-least once delivery" semantics.
*/
@deprecated("use `committableSink(ProducerSettings, CommitterSettings)` instead", "Alpakka Kafka 2.0.0")
def committableSink[K, V](
settings: ProducerSettings[K, V]): Sink[Envelope[K, V, ConsumerMessage.Committable], Future[Done]] =
flexiFlow[K, V, ConsumerMessage.Committable](settings)
.mapAsync(settings.parallelism)(_.passThrough.commitInternal())
.toMat(Sink.ignore)(Keep.right)
/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
* from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
* been published successfully to the topic.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
*
* Note that there is always a risk that something fails after publishing but before
* committing, so it is "at-least once delivery" semantics.
*
* Supports sharing a Kafka Producer instance.
*/
@deprecated("use `committableSink(ProducerSettings, CommitterSettings)` instead", "Alpakka Kafka 2.0.0")
def committableSink[K, V](
settings: ProducerSettings[K, V],
producer: org.apache.kafka.clients.producer.Producer[K, V])
: Sink[Envelope[K, V, ConsumerMessage.Committable], Future[Done]] =
committableSink(settings.withProducer(producer))
/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
* from a [[Consumer.committableSource]]. The offsets are batched and committed regularly.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
*
* Note that there is a risk that something fails after publishing but before
* committing, so it is "at-least once delivery" semantics.
*/
def committableSink[K, V](
producerSettings: ProducerSettings[K, V],
committerSettings: CommitterSettings): Sink[Envelope[K, V, ConsumerMessage.Committable], Future[Done]] =
Sink.fromGraph(new CommittingProducerSinkStage(producerSettings, committerSettings))
/**
* Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]] passed as
* context from a [[Consumer.sourceWithOffsetContext]]. The offsets are batched and committed regularly.
*
* It publishes records to Kafka topics conditionally:
*
* - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
*
* - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
*
* Note that there is a risk that something fails after publishing but before
* committing, so it is "at-least once delivery" semantics.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/880")
def committableSinkWithOffsetContext[K, V](
producerSettings: ProducerSettings[K, V],
committerSettings: CommitterSettings): Sink[(Envelope[K, V, _], Committable), Future[Done]] =
committableSink(producerSettings, committerSettings)
.contramap {
case (env, offset) =>
env.withPassThrough(offset)
}