in runtime/src/main/scala/org/apache/pekko/grpc/javadsl/GrpcMarshalling.scala [45:73]
def unmarshal[T](
data: Source[ByteString, AnyRef],
u: ProtobufSerializer[T],
mat: Materializer,
reader: GrpcProtocolReader): CompletionStage[T] =
data.via(reader.dataFrameDecoder).map(u.deserialize).runWith(SingleParameterSink.create[T](), mat)
def unmarshal[T](
entity: HttpEntity,
u: ProtobufSerializer[T],
mat: Materializer,
reader: GrpcProtocolReader): CompletionStage[T] =
unmarshal(entity.getDataBytes, u, mat, reader)
def unmarshalStream[T](
data: Source[ByteString, AnyRef],
u: ProtobufSerializer[T],
@nowarn("msg=is never used") mat: Materializer,
reader: GrpcProtocolReader): CompletionStage[Source[T, NotUsed]] = {
CompletableFuture.completedFuture[Source[T, NotUsed]](
data
.mapMaterializedValue(_ => NotUsed)
.via(reader.dataFrameDecoder)
.map(japiFunction(u.deserialize))
// In gRPC we signal failure by returning an error code, so we
// don't want the cancellation bubbled out
.via(new CancellationBarrierGraphStage)
.mapMaterializedValue(japiFunction(_ => NotUsed)))
}