override def stop()

in core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala [166:267]


    override def stop(): Future[Done] = Future.failed(exception)
    override def shutdown(): Future[Done] = Future.failed(exception)
    override def isShutdown: Future[Done] = Future.failed(exception)
    override def metrics: Future[Map[MetricName, Metric]] = Future.failed(exception)
  }

  /**
   * The `plainSource` emits `ConsumerRecord` elements (as received from the underlying `KafkaConsumer`).
   * It has no support for committing offsets to Kafka. It can be used when the offset is stored externally
   * or with auto-commit (note that auto-commit is by default disabled).
   *
   * The consumer application doesn't need to use Kafka's built-in offset storage and can store offsets in a store of its own
   * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
   * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
   * possible, but when it is, it will make the consumption fully atomic and give "exactly once" semantics that are
   * stronger than the "at-least once" semantics you get with Kafka's offset commit functionality.
   */
  def plainSource[K, V](settings: ConsumerSettings[K, V],
      subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
    Source.fromGraph(new PlainSource[K, V](settings, subscription))

  /**
   * The `committableSource` makes it possible to commit offset positions to Kafka.
   * This is useful when "at-least once delivery" is desired, as each message will likely be
   * delivered one time but in failure cases could be duplicated.
   *
   * If you commit the offset before processing the message you get "at-most once delivery" semantics,
   * and for that there is a [[#atMostOnceSource]].
   *
   * Compared to auto-commit, this gives exact control over when a message is considered consumed.
   *
   * If you need to store offsets in anything other than Kafka, [[#plainSource]] should be used
   * instead of this API.
   */
  def committableSource[K, V](settings: ConsumerSettings[K, V],
      subscription: Subscription): Source[CommittableMessage[K, V], Control] =
    Source.fromGraph(new CommittableSource[K, V](settings, subscription))

  /**
   * API MAY CHANGE
   *
   * This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
   * to commit offset positions to Kafka.
   * This is useful when "at-least once delivery" is desired, as each message will likely be
   * delivered one time but in failure cases could be duplicated.
   *
   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
   * [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
   */
  @ApiMayChange
  def sourceWithOffsetContext[K, V](
      settings: ConsumerSettings[K, V],
      subscription: Subscription): SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
    Source
      .fromGraph(new SourceWithOffsetContext[K, V](settings, subscription))
      .asSourceWithContext(_._2)
      .map(_._1)

  /**
   * API MAY CHANGE
   *
   * This source emits `ConsumerRecord` together with the offset position as flow context, thus makes it possible
   * to commit offset positions to Kafka.
   * This is useful when "at-least once delivery" is desired, as each message will likely be
   * delivered one time but in failure cases could be duplicated.
   *
   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
   * [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
   *
   * This variant makes it possible to add additional metadata (in the form of a string)
   * when an offset is committed based on the record. This can be useful (for example) to store information about which
   * node made the commit, what time the commit was made, the timestamp of the record etc.
   */
  @ApiMayChange
  def sourceWithOffsetContext[K, V](
      settings: ConsumerSettings[K, V],
      subscription: Subscription,
      metadataFromRecord: ConsumerRecord[K, V] => String)
      : SourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control] =
    Source
      .fromGraph(new SourceWithOffsetContext[K, V](settings, subscription, metadataFromRecord))
      .asSourceWithContext(_._2)
      .map(_._1)

  /**
   * The `commitWithMetadataSource` makes it possible to add additional metadata (in the form of a string)
   * when an offset is committed based on the record. This can be useful (for example) to store information about which
   * node made the commit, what time the commit was made, the timestamp of the record etc.
   */
  def commitWithMetadataSource[K, V](
      settings: ConsumerSettings[K, V],
      subscription: Subscription,
      metadataFromRecord: ConsumerRecord[K, V] => String): Source[CommittableMessage[K, V], Control] =
    Source.fromGraph(new CommittableSource[K, V](settings, subscription, metadataFromRecord))

  /**
   * Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka
   * before being emitted downstream.
   */
  def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
      subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
    committableSource[K, V](settings, subscription).mapAsync(1) { m =>