def responseToSource[O]()

in runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala [196:272]


  def responseToSource[O](response: Future[HttpResponse], deserializer: ProtobufSerializer[O])(
      implicit ec: ExecutionContext,
      mat: Materializer): Source[O, Future[GrpcResponseMetadata]] = {
    Source.lazyFutureSource[O, Future[GrpcResponseMetadata]](() => {
      response.map { response =>
        {
          if (response.status != StatusCodes.OK) {
            response.entity.discardBytes()
            val failure = mapToStatusException(response, immutable.Seq.empty)
            Source.failed(failure).mapMaterializedValue(_ => Future.failed(failure))
          } else {
            Codecs.detect(response) match {
              case Success(codec) =>
                implicit val reader: GrpcProtocolReader = GrpcProtocolNative.newReader(codec)
                val trailerPromise = Promise[immutable.Seq[HttpHeader]]()
                // Completed with success or failure based on grpc-status and grpc-message trailing headers
                val completionFuture: Future[Unit] =
                  trailerPromise.future.flatMap(trailers => parseResponseStatus(response, trailers))

                val responseData =
                  response.entity match {
                    case Chunked(_, chunks) =>
                      chunks
                        .map {
                          case Chunk(data, _) =>
                            data
                          case LastChunk(_, trailer) =>
                            trailerPromise.success(trailer)
                            ByteString.empty
                        }
                        .watchTermination()((_, done) =>
                          done.onComplete(_ => trailerPromise.trySuccess(immutable.Seq.empty)))
                    case Strict(_, data) =>
                      val rawTrailers =
                        response.attribute(AttributeKeys.trailer).map(_.headers).getOrElse(immutable.Seq.empty)
                      val trailers = rawTrailers.map(h => RawHeader(h._1, h._2))
                      trailerPromise.success(trailers)
                      Source.single[ByteString](data)
                    case _ =>
                      response.entity.discardBytes()
                      throw mapToStatusException(response, Seq.empty)
                  }
                responseData
                  // This never adds any data to the stream, but makes sure it fails with the correct error code if applicable
                  .concat(
                    Source
                      .maybe[ByteString]
                      .mapMaterializedValue(promise => promise.completeWith(completionFuture.map(_ => None))))
                  // Make sure we continue reading to get the trailing header even if we're no longer interested in the rest of the body
                  .via(new CancellationBarrierGraphStage)
                  .via(reader.dataFrameDecoder)
                  .map(deserializer.deserialize)
                  .mapMaterializedValue(_ =>
                    Future.successful(new GrpcResponseMetadata() {
                      override def headers: pekko.grpc.scaladsl.Metadata =
                        new HeaderMetadataImpl(response.headers)

                      override def getHeaders(): pekko.grpc.javadsl.Metadata =
                        new JavaMetadataImpl(new HeaderMetadataImpl(response.headers))

                      override def trailers: Future[pekko.grpc.scaladsl.Metadata] =
                        trailerPromise.future.map(new HeaderMetadataImpl(_))

                      override def getTrailers(): CompletionStage[pekko.grpc.javadsl.Metadata] =
                        trailerPromise.future
                          .map[pekko.grpc.javadsl.Metadata](h =>
                            new JavaMetadataImpl(new HeaderMetadataImpl(h)))
                          .asJava
                    }))
              case Failure(e) =>
                Source.failed[O](e).mapMaterializedValue(_ => Future.failed(e))
            }
          }
        }
      }
    })
  }.mapMaterializedValue(_.flatten)