in testkit/src/main/scala/org/apache/pekko/kafka/testkit/javadsl/ConsumerControlFactory.scala [34:70]
def attachControl[A, B](source: Source[A, B]): Source[A, Consumer.Control] =
source
.viaMat(controlFlow(), Keep.right[B, Consumer.Control])
def controlFlow[A](): Flow[A, A, Consumer.Control] =
scaladsl
.Flow[A]
.viaMat(KillSwitches.single[A])(scaladsl.Keep.right)
.mapMaterializedValue(killSwitch => control(killSwitch))
.asJava
def control(killSwitch: KillSwitch): Consumer.Control = new FakeControl(killSwitch)
class FakeControl(val killSwitch: KillSwitch) extends Consumer.Control {
val shutdownPromise: CompletableFuture[Done] = new CompletableFuture[Done]()
override def stop(): CompletionStage[Done] = {
killSwitch.shutdown()
shutdownPromise.complete(Done)
shutdownPromise
}
override def shutdown(): CompletionStage[Done] = stop()
override def isShutdown: CompletionStage[Done] = shutdownPromise
override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] = ???
override def drainAndShutdown[T](
streamCompletion: CompletionStage[T],
ec: Executor): CompletionStage[T] =
stop().thenCompose(new java.util.function.Function[Done, CompletionStage[T]] {
override def apply(t: Done): CompletionStage[T] = streamCompletion
})
}