override def shape: SinkShape[T] = SinkShape.of()

in runtime/src/main/scala/org/apache/pekko/grpc/internal/SingleParameterSink.scala [39:72]


  override def shape: SinkShape[T] = SinkShape.of(in)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[T]) = {
    val p: Promise[T] = Promise()
    (
      new GraphStageLogic(shape) with InHandler {
        override def preStart(): Unit = pull(in)

        def onPush(): Unit = {
          p.success(grab(in))
          // We expect only a completion
          pull(in)
        }

        override def onUpstreamFinish(): Unit = {
          if (!p.isCompleted) {
            p.failure(new MissingParameterException())
          }
          completeStage()
        }

        override def onUpstreamFailure(ex: Throwable): Unit = {
          p.tryFailure(ex)
          failStage(ex)
        }

        override def postStop(): Unit = {
          if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this))
        }

        setHandler(in, this)
      },
      p.future)
  }