in packages/dubbo/src/protocol-grpc-web/transport.ts [55:322]
export function createTransport(opt: CommonTransportOptions): Transport {
return {
async unary<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
message: PartialMessage<I>
): Promise<UnaryResponse<I, O>> {
const serialization = createMethodSerializationLookup(
method,
opt.binaryOptions,
opt.jsonOptions,
opt
);
return await runUnaryCall<I, O>({
interceptors: opt.interceptors,
signal,
timeoutMs,
req: {
stream: false,
service,
method,
url: createMethodUrl(opt.baseUrl, service, method),
init: {},
header: requestHeaderWithCompression(
opt.useBinaryFormat,
timeoutMs,
header,
opt.acceptCompression,
opt.sendCompression
),
message:
message instanceof method.I ? message : new method.I(message),
},
next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
const uRes = await opt.httpClient({
url: req.url,
method: "POST",
header: req.header,
signal: req.signal,
body: pipe(
createAsyncIterable([req.message]),
transformSerializeEnvelope(
serialization.getI(opt.useBinaryFormat)
),
transformCompressEnvelope(
opt.sendCompression,
opt.compressMinBytes
),
transformJoinEnvelopes(),
{
propagateDownStreamError: true,
}
),
});
const { compression } = validateResponseWithCompression(
opt.acceptCompression,
uRes.status,
uRes.header
);
const { trailer, message } = await pipeTo(
uRes.body,
transformSplitEnvelope(opt.readMaxBytes),
transformDecompressEnvelope(compression ?? null, opt.readMaxBytes),
transformParseEnvelope<O, Headers>(
serialization.getO(opt.useBinaryFormat),
trailerFlag,
createTrailerSerialization()
),
async (iterable) => {
let message: O | undefined;
let trailer: Headers | undefined;
for await (const env of iterable) {
if (env.end) {
if (trailer !== undefined) {
throw new ConnectError(
"protocol error: received extra trailer",
Code.InvalidArgument
);
}
trailer = env.value;
} else {
if (message !== undefined) {
throw new ConnectError(
"protocol error: received extra output message for unary method",
Code.InvalidArgument
);
}
message = env.value;
}
}
return { trailer, message };
},
{
propagateDownStreamError: false,
}
);
if (trailer === undefined) {
throw new ConnectError(
"protocol error: missing trailer",
Code.InvalidArgument
);
}
validateTrailer(trailer);
if (message === undefined) {
throw new ConnectError(
"protocol error: missing output message for unary method",
Code.InvalidArgument
);
}
return <UnaryResponse<I, O>>{
stream: false,
service,
method,
header: uRes.header,
message,
trailer,
};
},
});
},
async stream<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
input: AsyncIterable<I>
): Promise<StreamResponse<I, O>> {
const serialization = createMethodSerializationLookup(
method,
opt.binaryOptions,
opt.jsonOptions,
opt
);
return runStreamingCall<I, O>({
interceptors: opt.interceptors,
signal,
timeoutMs,
req: {
stream: true,
service,
method,
url: createMethodUrl(opt.baseUrl, service, method),
init: {
method: "POST",
redirect: "error",
mode: "cors",
},
header: requestHeaderWithCompression(
opt.useBinaryFormat,
timeoutMs,
header,
opt.acceptCompression,
opt.sendCompression
),
message: pipe(input, transformNormalizeMessage(method.I), {
propagateDownStreamError: true,
}),
},
next: async (req: StreamRequest<I, O>) => {
const uRes = await opt.httpClient({
url: req.url,
method: "POST",
header: req.header,
signal: req.signal,
body: pipe(
req.message,
transformNormalizeMessage(method.I),
transformSerializeEnvelope(
serialization.getI(opt.useBinaryFormat)
),
transformCompressEnvelope(
opt.sendCompression,
opt.compressMinBytes
),
transformJoinEnvelopes(),
{ propagateDownStreamError: true }
),
});
const { compression, foundStatus } = validateResponseWithCompression(
opt.acceptCompression,
uRes.status,
uRes.header
);
const res: StreamResponse<I, O> = {
...req,
header: uRes.header,
trailer: new Headers(),
message: pipe(
uRes.body,
transformSplitEnvelope(opt.readMaxBytes),
transformDecompressEnvelope(
compression ?? null,
opt.readMaxBytes
),
transformParseEnvelope(
serialization.getO(opt.useBinaryFormat),
trailerFlag,
createTrailerSerialization()
),
async function* (iterable) {
if (foundStatus) {
// A grpc-status: 0 response header was present. This is a "trailers-only"
// response (a response without a body and no trailers).
//
// The spec seems to disallow a trailers-only response for status 0 - we are
// lenient and only verify that the body is empty.
//
// > [...] Trailers-Only is permitted for calls that produce an immediate error.
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
const r = await iterable[Symbol.asyncIterator]().next();
if (r.done !== true) {
throw new ConnectError(
"protocol error: extra data for trailers-only",
Code.InvalidArgument
);
}
return;
}
let trailerReceived = false;
for await (const chunk of iterable) {
if (chunk.end) {
if (trailerReceived) {
throw new ConnectError(
"protocol error: received extra trailer",
Code.InvalidArgument
);
}
trailerReceived = true;
validateTrailer(chunk.value);
chunk.value.forEach((value, key) =>
res.trailer.set(key, value)
);
continue;
}
if (trailerReceived) {
throw new ConnectError(
"protocol error: received extra message after trailer",
Code.InvalidArgument
);
}
yield chunk.value;
}
if (!trailerReceived) {
throw new ConnectError(
"protocol error: missing trailer",
Code.InvalidArgument
);
}
},
{ propagateDownStreamError: true }
),
};
return res;
},
});
},
};
}