in core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala [265:277]
private def checkForCompletion(): Unit =
if (isClosed(stage.in))
if (awaitingCommitsBeforeShutdown()) {
upstreamCompletionState match {
case Some(Success(_)) =>
completeStage()
streamCompletion.success(Done)
case Some(Failure(ex)) =>
closeAndFailStage(ex)
case None =>
closeAndFailStage(new IllegalStateException("Stage completed, but there is no info about status"))
}
} else