def plainSink[K, V]()

in core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala [41:154]


  def plainSink[K, V](settings: ProducerSettings[K, V]): Sink[ProducerRecord[K, V], Future[Done]] =
    Flow[ProducerRecord[K, V]]
      .map(Message(_, NotUsed))
      .via(flexiFlow(settings))
      .toMat(Sink.ignore)(Keep.right)

  /**
   * Create a sink for publishing records to Kafka topics.
   *
   * The [[org.apache.kafka.clients.producer.ProducerRecord Kafka ProducerRecord]] contains the topic name to which the record is being sent, an optional
   * partition number, and an optional key and value.
   *
   * Supports sharing a Kafka Producer instance.
   */
  @deprecated(
    "Pass in external or shared producer using ProducerSettings.withProducerFactory or ProducerSettings.withProducer",
    "Alpakka Kafka 2.0.0")
  def plainSink[K, V](
      settings: ProducerSettings[K, V],
      producer: org.apache.kafka.clients.producer.Producer[K, V]): Sink[ProducerRecord[K, V], Future[Done]] =
    plainSink(settings.withProducer(producer))

  /**
   * Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
   * from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
   * been published successfully to the topic.
   *
   * It publishes records to Kafka topics conditionally:
   *
   * - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
   *
   * Note that there is a risk that something fails after publishing but before
   * committing, so it is "at-least once delivery" semantics.
   */
  @deprecated("use `committableSink(ProducerSettings, CommitterSettings)` instead", "Alpakka Kafka 2.0.0")
  def committableSink[K, V](
      settings: ProducerSettings[K, V]): Sink[Envelope[K, V, ConsumerMessage.Committable], Future[Done]] =
    flexiFlow[K, V, ConsumerMessage.Committable](settings)
      .mapAsync(settings.parallelism)(_.passThrough.commitInternal())
      .toMat(Sink.ignore)(Keep.right)

  /**
   * Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
   * from a [[Consumer.committableSource]]. It will commit the consumer offset when the message has
   * been published successfully to the topic.
   *
   * It publishes records to Kafka topics conditionally:
   *
   * - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
   *
   * Note that there is always a risk that something fails after publishing but before
   * committing, so it is "at-least once delivery" semantics.
   *
   * Supports sharing a Kafka Producer instance.
   */
  @deprecated("use `committableSink(ProducerSettings, CommitterSettings)` instead", "Alpakka Kafka 2.0.0")
  def committableSink[K, V](
      settings: ProducerSettings[K, V],
      producer: org.apache.kafka.clients.producer.Producer[K, V])
      : Sink[Envelope[K, V, ConsumerMessage.Committable], Future[Done]] =
    committableSink(settings.withProducer(producer))

  /**
   * Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]]
   * from a [[Consumer.committableSource]]. The offsets are batched and committed regularly.
   *
   * It publishes records to Kafka topics conditionally:
   *
   * - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
   *
   * Note that there is a risk that something fails after publishing but before
   * committing, so it is "at-least once delivery" semantics.
   */
  def committableSink[K, V](
      producerSettings: ProducerSettings[K, V],
      committerSettings: CommitterSettings): Sink[Envelope[K, V, ConsumerMessage.Committable], Future[Done]] =
    Sink.fromGraph(new CommittingProducerSinkStage(producerSettings, committerSettings))

  /**
   * Create a sink that is aware of the [[ConsumerMessage.Committable committable offset]] passed as
   * context from a [[Consumer.sourceWithOffsetContext]]. The offsets are batched and committed regularly.
   *
   * It publishes records to Kafka topics conditionally:
   *
   * - [[pekko.kafka.ProducerMessage.Message Message]] publishes a single message to its topic, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.MultiMessage MultiMessage]] publishes all messages in its `records` field, and commits the offset
   *
   * - [[pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, but commits the offset
   *
   * Note that there is a risk that something fails after publishing but before
   * committing, so it is "at-least once delivery" semantics.
   */
  @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/880")
  def committableSinkWithOffsetContext[K, V](
      producerSettings: ProducerSettings[K, V],
      committerSettings: CommitterSettings): Sink[(Envelope[K, V, _], Committable), Future[Done]] =
    committableSink(producerSettings, committerSettings)
      .contramap {
        case (env, offset) =>
          env.withPassThrough(offset)
      }