in core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala [90:267]
override def stop(): CompletionStage[Done] = control.stop()
override def shutdown(): CompletionStage[Done] = control.shutdown()
override def drainAndShutdown[S](streamCompletion: CompletionStage[S], ec: Executor): CompletionStage[S] =
control.drainAndShutdown(streamCompletion, ec)
/**
* Stop producing messages from the `Source`, wait for stream completion
* and shut down the consumer `Source`. It will wait for outstanding offset
* commit requests to finish before shutting down.
*/
def drainAndShutdown(ec: Executor): CompletionStage[T] =
control.drainAndShutdown(streamCompletion, ec)
override def isShutdown: CompletionStage[Done] = control.isShutdown
override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] = control.getMetrics
}
/**
* Combine the consumer control and a stream completion signal materialized values into
* one, so that the stream can be stopped in a controlled way without losing
* commits.
*
* For use in `mapMaterializedValue`.
*/
def createDrainingControl[T](pair: Pair[Control, CompletionStage[T]]) =
new DrainingControl[T](pair.first, pair.second)
/**
* Combine the consumer control and a stream completion signal materialized values into
* one, so that the stream can be stopped in a controlled way without losing
* commits.
*
* For use in the `toMat` combination of materialized values.
*/
def createDrainingControl[T](c: Control, mat: CompletionStage[T]): DrainingControl[T] = new DrainingControl[T](c, mat)
/**
* An implementation of Control to be used as an empty value, all methods return
* a failed `CompletionStage`.
*/
def createNoopControl(): Control = new ConsumerControlAsJava(scaladsl.Consumer.NoopControl)
/**
* The `plainSource` emits `ConsumerRecord` elements (as received from the underlying `KafkaConsumer`).
* It has no support for committing offsets to Kafka. It can be used when the offset is stored externally
* or with auto-commit (note that auto-commit is by default disabled).
*
* The consumer application doesn't need to use Kafka's built-in offset storage and can store offsets in a store of its own
* choosing. The primary use case for this is allowing the application to store both the offset and the results of the
* consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
* possible, but when it is, it will make the consumption fully atomic and give "exactly once" semantics that are
* stronger than the "at-least once" semantics you get with Kafka's offset commit functionality.
*/
def plainSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
scaladsl.Consumer
.plainSource(settings, subscription)
.mapMaterializedValue(ConsumerControlAsJava.apply)
.asJava
/**
* The `committableSource` makes it possible to commit offset positions to Kafka.
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
* If you commit the offset before processing the message you get "at-most once delivery" semantics,
* and for that there is a [[#atMostOnceSource]].
*
* Compared to auto-commit, this gives exact control over when a message is considered consumed.
*
* If you need to store offsets in anything other than Kafka, [[#plainSource]] should be used
* instead of this API.
*/
def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] =
scaladsl.Consumer
.committableSource(settings, subscription)
.mapMaterializedValue(ConsumerControlAsJava.apply)
.asJava
/**
* API MAY CHANGE
*
* This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
* to commit offset positions to Kafka.
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
* It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Producer.flowWithContext]].
*/
@ApiMayChange
def sourceWithOffsetContext[K, V](
settings: ConsumerSettings[K, V],
subscription: Subscription): SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
// TODO this could use `scaladsl committableSourceWithContext` but `mapMaterializedValue` is not available, yet
// See https://github.com/akka/akka/issues/26836
pekko.stream.scaladsl.Source
.fromGraph(new SourceWithOffsetContext[K, V](settings, subscription))
.mapMaterializedValue(ConsumerControlAsJava.apply)
.asSourceWithContext(_._2)
.map(_._1)
.asJava
/**
* API MAY CHANGE
*
* This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
* to commit offset positions to Kafka.
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
* It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Producer.flowWithContext]].
*
* This variant makes it possible to add additional metadata (in the form of a string)
* when an offset is committed based on the record. This can be useful (for example) to store information about which
* node made the commit, what time the commit was made, the timestamp of the record etc.
*/
@ApiMayChange
def sourceWithOffsetContext[K, V](
settings: ConsumerSettings[K, V],
subscription: Subscription,
metadataFromRecord: java.util.function.Function[ConsumerRecord[K, V], String])
: SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
// TODO this could use `scaladsl committableSourceWithContext` but `mapMaterializedValue` is not available, yet
// See https://github.com/akka/akka/issues/26836
pekko.stream.scaladsl.Source
.fromGraph(
new SourceWithOffsetContext[K, V](settings,
subscription,
(record: ConsumerRecord[K, V]) => metadataFromRecord(record)))
.mapMaterializedValue(ConsumerControlAsJava.apply)
.asSourceWithContext(_._2)
.map(_._1)
.asJava
/**
* The `commitWithMetadataSource` makes it possible to add additional metadata (in the form of a string)
* when an offset is committed based on the record. This can be useful (for example) to store information about which
* node made the commit, what time the commit was made, the timestamp of the record etc.
*/
def commitWithMetadataSource[K, V](
settings: ConsumerSettings[K, V],
subscription: Subscription,
metadataFromRecord: java.util.function.Function[ConsumerRecord[K, V], String])
: Source[CommittableMessage[K, V], Control] =
scaladsl.Consumer
.commitWithMetadataSource(settings, subscription, (record: ConsumerRecord[K, V]) => metadataFromRecord(record))
.mapMaterializedValue(ConsumerControlAsJava.apply)
.asJava
/**
* Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka
* before being emitted downstream.
*/
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
scaladsl.Consumer
.atMostOnceSource(settings, subscription)
.mapMaterializedValue(ConsumerControlAsJava.apply)
.asJava
/**
* The `plainPartitionedSource` is a way to track automatic partition assignment from kafka.
* When a topic-partition is assigned to a consumer, this source will emit pairs with the assigned topic-partition and a corresponding
* source of `ConsumerRecord`s.
* When a topic-partition is revoked, the corresponding source completes.
*/
def plainPartitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] =
scaladsl.Consumer
.plainPartitionedSource(settings, subscription)
.map {