in runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala [112:159]
def this(
descriptor: MethodDescriptor[I, O],
channel: InternalChannel,
defaultOptions: CallOptions,
settings: GrpcClientSettings)(implicit mat: Materializer, ec: ExecutionContext) =
this(descriptor, channel, defaultOptions, settings, MetadataImpl.empty)
@deprecated("fqMethodName was removed since it can be derived from the descriptor", "akka-grpc 1.1.0")
@InternalStableApi
def this(
descriptor: MethodDescriptor[I, O],
fqMethodName: String,
channel: InternalChannel,
defaultOptions: CallOptions,
settings: GrpcClientSettings)(implicit mat: Materializer, ec: ExecutionContext) =
this(descriptor, channel, defaultOptions, settings, MetadataImpl.empty)
private def callOptionsWithDeadline(): CallOptions =
NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
override def invoke(request: Source[I, NotUsed]): Future[O] =
invokeWithMetadata(request).map(_.value)(ExecutionContexts.parasitic)
override def invokeWithMetadata(source: Source[I, NotUsed]): Future[GrpcSingleResponse[O]] = {
// a bit much overhead here because we are using the flow to represent a single response
val src =
channel.invokeWithMetadata(source, headers, descriptor, false, callOptionsWithDeadline())
val (metadataFuture: Future[GrpcResponseMetadata], resultFuture: Future[O]) =
src
// Continue reading to get the trailing headers
.via(new CancellationBarrierGraphStage)
.toMat(Sink.head)(Keep.both)
.run()
metadataFuture
.zip(resultFuture)
.map {
case (metadata, result) =>
new GrpcSingleResponse[O] {
def value: O = result
def getValue(): O = result
def headers = metadata.headers
def getHeaders() = metadata.getHeaders()
def trailers = metadata.trailers
def getTrailers() = metadata.getTrailers()
}
}(ExecutionContexts.parasitic)
}