in runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala [64:197]
def createLogicAndMaterializedValue(
inheritedAttributes: stream.Attributes): (GraphStageLogic, Future[GrpcResponseMetadata]) = {
import PekkoNettyGrpcClientGraphStage._
val matVal = Promise[GrpcResponseMetadata]()
val trailerPromise = Promise[Metadata]()
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
// this is here just to fail single response requests getting more responses
// duplicating behavior in io.grpc.stub.ClientCalls
var sawFirstElement = false
var requested = 0
// any here to avoid wrapping every incoming element
val callback = getAsyncCallback[Any] {
case msg: ControlMessage =>
msg match {
case ReadyForSending => if (!isClosed(in) && !hasBeenPulled(in)) tryPull(in)
case Closed(status, trailer) => onCallClosed(status, trailer)
}
case element: O @unchecked =>
if (!streamingResponse) {
if (sawFirstElement) {
throw new IllegalStateException("Got more than one messages back from to a non-streaming call")
} else sawFirstElement = true
}
emit(out, element)
requested -= 1
}
var call: ClientCall[I, O] = null
val listener = new ClientCall.Listener[O] {
private def makeResponseMetadata(metadata: Metadata) =
new GrpcResponseMetadata {
private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadata)
private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(metadata)
def headers: scaladsl.Metadata = sMetadata
def getHeaders(): javadsl.Metadata = jMetadata
private lazy val sTrailers =
trailerPromise.future.map(MetadataImpl.scalaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
private lazy val jTrailers = trailerPromise.future
.map(MetadataImpl.javaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
.asJava
def trailers: Future[scaladsl.Metadata] = sTrailers
def getTrailers(): CompletionStage[javadsl.Metadata] = jTrailers
}
override def onReady(): Unit =
callback.invoke(ReadyForSending)
override def onHeaders(responseHeaders: Metadata): Unit =
matVal.success(makeResponseMetadata(responseHeaders))
override def onMessage(message: O): Unit =
callback.invoke(message)
override def onClose(status: Status, trailers: Metadata): Unit = {
matVal.trySuccess(makeResponseMetadata(new Metadata()))
trailerPromise.success(trailers)
callback.invoke(Closed(status, trailers))
}
}
override def preStart(): Unit = {
call = channel.newCall(descriptor, options)
call.start(listener, headers.toGoogleGrpcMetadata())
// always pull early - pull 2 for non-streaming response "to trigger failure early"
// duplicating behavior in io.grpc.stub.ClientCalls - not sure why this is a good idea
val initialRequest = if (streamingResponse) 1 else 2
call.request(initialRequest)
requested = initialRequest
// give us a chance to deal with the call cancellation even when
// the up and downstreams are done
setKeepGoing(true)
// the netty client doesn't always start with an OnReady, but all calls has at least one
// request so pull early to get things going
pull(in)
}
override def onPush(): Unit = {
call.sendMessage(grab(in))
if (call.isReady && !hasBeenPulled(in)) {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
call.halfClose()
if (isClosed(out)) {
call.cancel("Upstream completed and downstream has cancelled", null)
call = null
completeStage()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
call.cancel("Failure from upstream", ex)
call = null
failStage(ex)
}
override def onPull(): Unit =
if (requested == 0) {
call.request(1)
requested += 1
}
override def onDownstreamFinish(cause: Throwable): Unit =
if (isClosed(out)) {
call.cancel("Downstream cancelled", cause)
call = null
completeStage()
}
def onCallClosed(status: Status, trailers: Metadata): Unit = {
if (status.isOk()) {
// FIXME share trailers through matval
completeStage()
} else {
failStage(status.asRuntimeException(trailers))
}
call = null
}
override def postStop(): Unit = {
if (call != null) {
call.cancel("Abrupt stream termination", null)
call = null
}
if (!matVal.isCompleted) {
matVal.tryFailure(new AbruptStageTerminationException(this))
}
}
setHandlers(in, out, this)
}
(logic, matVal.future)
}