in core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala [166:267]
override def stop(): Future[Done] = Future.failed(exception)
override def shutdown(): Future[Done] = Future.failed(exception)
override def isShutdown: Future[Done] = Future.failed(exception)
override def metrics: Future[Map[MetricName, Metric]] = Future.failed(exception)
}
/**
* The `plainSource` emits `ConsumerRecord` elements (as received from the underlying `KafkaConsumer`).
* It has no support for committing offsets to Kafka. It can be used when the offset is stored externally
* or with auto-commit (note that auto-commit is by default disabled).
*
* The consumer application doesn't need to use Kafka's built-in offset storage and can store offsets in a store of its own
* choosing. The primary use case for this is allowing the application to store both the offset and the results of the
* consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
* possible, but when it is, it will make the consumption fully atomic and give "exactly once" semantics that are
* stronger than the "at-least once" semantics you get with Kafka's offset commit functionality.
*/
def plainSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
Source.fromGraph(new PlainSource[K, V](settings, subscription))
/**
* The `committableSource` makes it possible to commit offset positions to Kafka.
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
* If you commit the offset before processing the message you get "at-most once delivery" semantics,
* and for that there is a [[#atMostOnceSource]].
*
* Compared to auto-commit, this gives exact control over when a message is considered consumed.
*
* If you need to store offsets in anything other than Kafka, [[#plainSource]] should be used
* instead of this API.
*/
def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] =
Source.fromGraph(new CommittableSource[K, V](settings, subscription))
/**
* API MAY CHANGE
*
* This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
* to commit offset positions to Kafka.
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
* It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
* [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
*/
@ApiMayChange
def sourceWithOffsetContext[K, V](
settings: ConsumerSettings[K, V],
subscription: Subscription): SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
Source
.fromGraph(new SourceWithOffsetContext[K, V](settings, subscription))
.asSourceWithContext(_._2)
.map(_._1)
/**
* API MAY CHANGE
*
* This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
* to commit offset positions to Kafka.
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
* It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
* [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
*
* This variant makes it possible to add additional metadata (in the form of a string)
* when an offset is committed based on the record. This can be useful (for example) to store information about which
* node made the commit, what time the commit was made, the timestamp of the record etc.
*/
@ApiMayChange
def sourceWithOffsetContext[K, V](
settings: ConsumerSettings[K, V],
subscription: Subscription,
metadataFromRecord: ConsumerRecord[K, V] => String)
: SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
Source
.fromGraph(new SourceWithOffsetContext[K, V](settings, subscription, metadataFromRecord))
.asSourceWithContext(_._2)
.map(_._1)
/**
* The `commitWithMetadataSource` makes it possible to add additional metadata (in the form of a string)
* when an offset is committed based on the record. This can be useful (for example) to store information about which
* node made the commit, what time the commit was made, the timestamp of the record etc.
*/
def commitWithMetadataSource[K, V](
settings: ConsumerSettings[K, V],
subscription: Subscription,
metadataFromRecord: ConsumerRecord[K, V] => String): Source[CommittableMessage[K, V], Control] =
Source.fromGraph(new CommittableSource[K, V](settings, subscription, metadataFromRecord))
/**
* Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka
* before being emitted downstream.
*/
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
committableSource[K, V](settings, subscription).mapAsync(1) { m =>