in runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala [196:272]
def responseToSource[O](response: Future[HttpResponse], deserializer: ProtobufSerializer[O])(
implicit ec: ExecutionContext,
mat: Materializer): Source[O, Future[GrpcResponseMetadata]] = {
Source.lazyFutureSource[O, Future[GrpcResponseMetadata]](() => {
response.map { response =>
{
if (response.status != StatusCodes.OK) {
response.entity.discardBytes()
val failure = mapToStatusException(response, immutable.Seq.empty)
Source.failed(failure).mapMaterializedValue(_ => Future.failed(failure))
} else {
Codecs.detect(response) match {
case Success(codec) =>
implicit val reader: GrpcProtocolReader = GrpcProtocolNative.newReader(codec)
val trailerPromise = Promise[immutable.Seq[HttpHeader]]()
// Completed with success or failure based on grpc-status and grpc-message trailing headers
val completionFuture: Future[Unit] =
trailerPromise.future.flatMap(trailers => parseResponseStatus(response, trailers))
val responseData =
response.entity match {
case Chunked(_, chunks) =>
chunks
.map {
case Chunk(data, _) =>
data
case LastChunk(_, trailer) =>
trailerPromise.success(trailer)
ByteString.empty
}
.watchTermination()((_, done) =>
done.onComplete(_ => trailerPromise.trySuccess(immutable.Seq.empty)))
case Strict(_, data) =>
val rawTrailers =
response.attribute(AttributeKeys.trailer).map(_.headers).getOrElse(immutable.Seq.empty)
val trailers = rawTrailers.map(h => RawHeader(h._1, h._2))
trailerPromise.success(trailers)
Source.single[ByteString](data)
case _ =>
response.entity.discardBytes()
throw mapToStatusException(response, Seq.empty)
}
responseData
// This never adds any data to the stream, but makes sure it fails with the correct error code if applicable
.concat(
Source
.maybe[ByteString]
.mapMaterializedValue(promise => promise.completeWith(completionFuture.map(_ => None))))
// Make sure we continue reading to get the trailing header even if we're no longer interested in the rest of the body
.via(new CancellationBarrierGraphStage)
.via(reader.dataFrameDecoder)
.map(deserializer.deserialize)
.mapMaterializedValue(_ =>
Future.successful(new GrpcResponseMetadata() {
override def headers: pekko.grpc.scaladsl.Metadata =
new HeaderMetadataImpl(response.headers)
override def getHeaders(): pekko.grpc.javadsl.Metadata =
new JavaMetadataImpl(new HeaderMetadataImpl(response.headers))
override def trailers: Future[pekko.grpc.scaladsl.Metadata] =
trailerPromise.future.map(new HeaderMetadataImpl(_))
override def getTrailers(): CompletionStage[pekko.grpc.javadsl.Metadata] =
trailerPromise.future
.map[pekko.grpc.javadsl.Metadata](h =>
new JavaMetadataImpl(new HeaderMetadataImpl(h)))
.asJava
}))
case Failure(e) =>
Source.failed[O](e).mapMaterializedValue(_ => Future.failed(e))
}
}
}
}
})
}.mapMaterializedValue(_.flatten)