def createLogicAndMaterializedValue()

in runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala [64:197]


  def createLogicAndMaterializedValue(
      inheritedAttributes: stream.Attributes): (GraphStageLogic, Future[GrpcResponseMetadata]) = {
    import PekkoNettyGrpcClientGraphStage._
    val matVal = Promise[GrpcResponseMetadata]()
    val trailerPromise = Promise[Metadata]()

    val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
      // this is here just to fail single response requests getting more responses
      // duplicating behavior in io.grpc.stub.ClientCalls
      var sawFirstElement = false
      var requested = 0

      // any here to avoid wrapping every incoming element
      val callback = getAsyncCallback[Any] {
        case msg: ControlMessage =>
          msg match {
            case ReadyForSending         => if (!isClosed(in) && !hasBeenPulled(in)) tryPull(in)
            case Closed(status, trailer) => onCallClosed(status, trailer)
          }
        case element: O @unchecked =>
          if (!streamingResponse) {
            if (sawFirstElement) {
              throw new IllegalStateException("Got more than one messages back from to a non-streaming call")
            } else sawFirstElement = true
          }
          emit(out, element)
          requested -= 1
      }

      var call: ClientCall[I, O] = null

      val listener = new ClientCall.Listener[O] {
        private def makeResponseMetadata(metadata: Metadata) =
          new GrpcResponseMetadata {
            private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadata)
            private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(metadata)
            def headers: scaladsl.Metadata = sMetadata
            def getHeaders(): javadsl.Metadata = jMetadata

            private lazy val sTrailers =
              trailerPromise.future.map(MetadataImpl.scalaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
            private lazy val jTrailers = trailerPromise.future
              .map(MetadataImpl.javaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
              .asJava
            def trailers: Future[scaladsl.Metadata] = sTrailers
            def getTrailers(): CompletionStage[javadsl.Metadata] = jTrailers
          }
        override def onReady(): Unit =
          callback.invoke(ReadyForSending)
        override def onHeaders(responseHeaders: Metadata): Unit =
          matVal.success(makeResponseMetadata(responseHeaders))
        override def onMessage(message: O): Unit =
          callback.invoke(message)
        override def onClose(status: Status, trailers: Metadata): Unit = {
          matVal.trySuccess(makeResponseMetadata(new Metadata()))
          trailerPromise.success(trailers)
          callback.invoke(Closed(status, trailers))
        }
      }
      override def preStart(): Unit = {
        call = channel.newCall(descriptor, options)
        call.start(listener, headers.toGoogleGrpcMetadata())

        // always pull early - pull 2 for non-streaming response "to trigger failure early"
        // duplicating behavior in io.grpc.stub.ClientCalls - not sure why this is a good idea
        val initialRequest = if (streamingResponse) 1 else 2
        call.request(initialRequest)
        requested = initialRequest

        // give us a chance to deal with the call cancellation even when
        // the up and downstreams are done
        setKeepGoing(true)

        // the netty client doesn't always start with an OnReady, but all calls has at least one
        // request so pull early to get things going
        pull(in)
      }
      override def onPush(): Unit = {
        call.sendMessage(grab(in))
        if (call.isReady && !hasBeenPulled(in)) {
          pull(in)
        }
      }
      override def onUpstreamFinish(): Unit = {
        call.halfClose()
        if (isClosed(out)) {
          call.cancel("Upstream completed and downstream has cancelled", null)
          call = null
          completeStage()
        }
      }
      override def onUpstreamFailure(ex: Throwable): Unit = {
        call.cancel("Failure from upstream", ex)
        call = null
        failStage(ex)
      }

      override def onPull(): Unit =
        if (requested == 0) {
          call.request(1)
          requested += 1
        }
      override def onDownstreamFinish(cause: Throwable): Unit =
        if (isClosed(out)) {
          call.cancel("Downstream cancelled", cause)
          call = null
          completeStage()
        }

      def onCallClosed(status: Status, trailers: Metadata): Unit = {
        if (status.isOk()) {
          // FIXME share trailers through matval
          completeStage()
        } else {
          failStage(status.asRuntimeException(trailers))
        }
        call = null
      }

      override def postStop(): Unit = {
        if (call != null) {
          call.cancel("Abrupt stream termination", null)
          call = null
        }
        if (!matVal.isCompleted) {
          matVal.tryFailure(new AbruptStageTerminationException(this))
        }
      }

      setHandlers(in, out, this)
    }

    (logic, matVal.future)
  }