def attachControl[A, B]()

in testkit/src/main/scala/org/apache/pekko/kafka/testkit/javadsl/ConsumerControlFactory.scala [34:70]


  def attachControl[A, B](source: Source[A, B]): Source[A, Consumer.Control] =
    source
      .viaMat(controlFlow(), Keep.right[B, Consumer.Control])

  def controlFlow[A](): Flow[A, A, Consumer.Control] =
    scaladsl
      .Flow[A]
      .viaMat(KillSwitches.single[A])(scaladsl.Keep.right)
      .mapMaterializedValue(killSwitch => control(killSwitch))
      .asJava

  def control(killSwitch: KillSwitch): Consumer.Control = new FakeControl(killSwitch)

  class FakeControl(val killSwitch: KillSwitch) extends Consumer.Control {

    val shutdownPromise: CompletableFuture[Done] = new CompletableFuture[Done]()

    override def stop(): CompletionStage[Done] = {
      killSwitch.shutdown()
      shutdownPromise.complete(Done)
      shutdownPromise
    }

    override def shutdown(): CompletionStage[Done] = stop()

    override def isShutdown: CompletionStage[Done] = shutdownPromise

    override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] = ???

    override def drainAndShutdown[T](
        streamCompletion: CompletionStage[T],
        ec: Executor): CompletionStage[T] =
      stop().thenCompose(new java.util.function.Function[Done, CompletionStage[T]] {
        override def apply(t: Done): CompletionStage[T] = streamCompletion
      })

  }