override def stop()

in core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala [90:267]


    override def stop(): CompletionStage[Done] = control.stop()

    override def shutdown(): CompletionStage[Done] = control.shutdown()

    override def drainAndShutdown[S](streamCompletion: CompletionStage[S], ec: Executor): CompletionStage[S] =
      control.drainAndShutdown(streamCompletion, ec)

    /**
     * Stop producing messages from the `Source`, wait for stream completion
     * and shut down the consumer `Source`. It will wait for outstanding offset
     * commit requests to finish before shutting down.
     */
    def drainAndShutdown(ec: Executor): CompletionStage[T] =
      control.drainAndShutdown(streamCompletion, ec)

    override def isShutdown: CompletionStage[Done] = control.isShutdown

    override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] = control.getMetrics
  }

  /**
   * Combine the consumer control and a stream completion signal materialized values into
   * one, so that the stream can be stopped in a controlled way without losing
   * commits.
   *
   * For use in `mapMaterializedValue`.
   */
  def createDrainingControl[T](pair: Pair[Control, CompletionStage[T]]) =
    new DrainingControl[T](pair.first, pair.second)

  /**
   * Combine the consumer control and a stream completion signal materialized values into
   * one, so that the stream can be stopped in a controlled way without losing
   * commits.
   *
   * For use in the `toMat` combination of materialized values.
   */
  def createDrainingControl[T](c: Control, mat: CompletionStage[T]): DrainingControl[T] = new DrainingControl[T](c, mat)

  /**
   * An implementation of Control to be used as an empty value, all methods return
   * a failed `CompletionStage`.
   */
  def createNoopControl(): Control = new ConsumerControlAsJava(scaladsl.Consumer.NoopControl)

  /**
   * The `plainSource` emits `ConsumerRecord` elements (as received from the underlying `KafkaConsumer`).
   * It has no support for committing offsets to Kafka. It can be used when the offset is stored externally
   * or with auto-commit (note that auto-commit is by default disabled).
   *
   * The consumer application doesn't need to use Kafka's built-in offset storage and can store offsets in a store of its own
   * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
   * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
   * possible, but when it is, it will make the consumption fully atomic and give "exactly once" semantics that are
   * stronger than the "at-least once" semantics you get with Kafka's offset commit functionality.
   */
  def plainSource[K, V](settings: ConsumerSettings[K, V],
      subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
    scaladsl.Consumer
      .plainSource(settings, subscription)
      .mapMaterializedValue(ConsumerControlAsJava.apply)
      .asJava

  /**
   * The `committableSource` makes it possible to commit offset positions to Kafka.
   * This is useful when "at-least once delivery" is desired, as each message will likely be
   * delivered one time but in failure cases could be duplicated.
   *
   * If you commit the offset before processing the message you get "at-most once delivery" semantics,
   * and for that there is a [[#atMostOnceSource]].
   *
   * Compared to auto-commit, this gives exact control over when a message is considered consumed.
   *
   * If you need to store offsets in anything other than Kafka, [[#plainSource]] should be used
   * instead of this API.
   */
  def committableSource[K, V](settings: ConsumerSettings[K, V],
      subscription: Subscription): Source[CommittableMessage[K, V], Control] =
    scaladsl.Consumer
      .committableSource(settings, subscription)
      .mapMaterializedValue(ConsumerControlAsJava.apply)
      .asJava

  /**
   * API MAY CHANGE
   *
   * This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
   * to commit offset positions to Kafka.
   * This is useful when "at-least once delivery" is desired, as each message will likely be
   * delivered one time but in failure cases could be duplicated.
   *
   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
   * and [[Producer.flowWithContext]].
   */
  @ApiMayChange
  def sourceWithOffsetContext[K, V](
      settings: ConsumerSettings[K, V],
      subscription: Subscription): SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
    // TODO this could use `scaladsl committableSourceWithContext` but `mapMaterializedValue` is not available, yet
    // See https://github.com/akka/akka/issues/26836
    pekko.stream.scaladsl.Source
      .fromGraph(new SourceWithOffsetContext[K, V](settings, subscription))
      .mapMaterializedValue(ConsumerControlAsJava.apply)
      .asSourceWithContext(_._2)
      .map(_._1)
      .asJava

  /**
   * API MAY CHANGE
   *
   * This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
   * to commit offset positions to Kafka.
   * This is useful when "at-least once delivery" is desired, as each message will likely be
   * delivered one time but in failure cases could be duplicated.
   *
   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
   * and [[Producer.flowWithContext]].
   *
   * This variant makes it possible to add additional metadata (in the form of a string)
   * when an offset is committed based on the record. This can be useful (for example) to store information about which
   * node made the commit, what time the commit was made, the timestamp of the record etc.
   */
  @ApiMayChange
  def sourceWithOffsetContext[K, V](
      settings: ConsumerSettings[K, V],
      subscription: Subscription,
      metadataFromRecord: java.util.function.Function[ConsumerRecord[K, V], String])
      : SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
    // TODO this could use `scaladsl committableSourceWithContext` but `mapMaterializedValue` is not available, yet
    // See https://github.com/akka/akka/issues/26836
    pekko.stream.scaladsl.Source
      .fromGraph(
        new SourceWithOffsetContext[K, V](settings,
          subscription,
          (record: ConsumerRecord[K, V]) => metadataFromRecord(record)))
      .mapMaterializedValue(ConsumerControlAsJava.apply)
      .asSourceWithContext(_._2)
      .map(_._1)
      .asJava

  /**
   * The `commitWithMetadataSource` makes it possible to add additional metadata (in the form of a string)
   * when an offset is committed based on the record. This can be useful (for example) to store information about which
   * node made the commit, what time the commit was made, the timestamp of the record etc.
   */
  def commitWithMetadataSource[K, V](
      settings: ConsumerSettings[K, V],
      subscription: Subscription,
      metadataFromRecord: java.util.function.Function[ConsumerRecord[K, V], String])
      : Source[CommittableMessage[K, V], Control] =
    scaladsl.Consumer
      .commitWithMetadataSource(settings, subscription, (record: ConsumerRecord[K, V]) => metadataFromRecord(record))
      .mapMaterializedValue(ConsumerControlAsJava.apply)
      .asJava

  /**
   * Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka
   * before being emitted downstream.
   */
  def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
      subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
    scaladsl.Consumer
      .atMostOnceSource(settings, subscription)
      .mapMaterializedValue(ConsumerControlAsJava.apply)
      .asJava

  /**
   * The `plainPartitionedSource` is a way to track automatic partition assignment from kafka.
   * When a topic-partition is assigned to a consumer, this source will emit pairs with the assigned topic-partition and a corresponding
   * source of `ConsumerRecord`s.
   * When a topic-partition is revoked, the corresponding source completes.
   */
  def plainPartitionedSource[K, V](
      settings: ConsumerSettings[K, V],
      subscription: AutoSubscription): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] =
    scaladsl.Consumer
      .plainPartitionedSource(settings, subscription)
      .map {