in runtime/src/main/scala/org/apache/pekko/grpc/internal/SingleParameterSink.scala [39:72]
override def shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[T]) = {
val p: Promise[T] = Promise()
(
new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = pull(in)
def onPush(): Unit = {
p.success(grab(in))
// We expect only a completion
pull(in)
}
override def onUpstreamFinish(): Unit = {
if (!p.isCompleted) {
p.failure(new MissingParameterException())
}
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
p.tryFailure(ex)
failStage(ex)
}
override def postStop(): Unit = {
if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this))
}
setHandler(in, this)
},
p.future)
}