in packages/dubbo-web/src/grpc-web-transport.ts [117:333]
export function createGrpcWebTransport(
options: GrpcWebTransportOptions
): Transport {
assertFetchApi();
const useBinaryFormat = options.useBinaryFormat ?? true;
const fetch = options.fetch ?? globalThis.fetch;
return {
async unary<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: Headers,
message: PartialMessage<I>
): Promise<UnaryResponse<I, O>> {
const { normalize, serialize, parse } = createClientMethodSerializers(
method,
useBinaryFormat,
options.jsonOptions,
options.binaryOptions
);
return await runUnaryCall<I, O>({
interceptors: options.interceptors,
signal,
timeoutMs,
req: {
stream: false,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
header: requestHeader(useBinaryFormat, timeoutMs, header),
message: normalize(message),
},
next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
const response = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body: encodeEnvelope(0, serialize(req.message)),
});
validateResponse(response.status, response.headers);
if (!response.body) {
throw "missing response body";
}
const reader = createEnvelopeReadableStream(
response.body
).getReader();
let trailer: Headers | undefined;
let message: O | undefined;
for (;;) {
const r = await reader.read();
if (r.done) {
break;
}
const { flags, data } = r.value;
if (flags === trailerFlag) {
if (trailer !== undefined) {
throw "extra trailer";
}
// Unary responses require exactly one response message, but in
// case of an error, it is perfectly valid to have a response body
// that only contains error trailers.
trailer = trailerParse(data);
continue;
}
if (message !== undefined) {
throw "extra message";
}
message = parse(data);
}
if (trailer === undefined) {
throw "missing trailer";
}
validateTrailer(trailer);
if (message === undefined) {
throw "missing message";
}
return <UnaryResponse<I, O>>{
stream: false,
header: response.headers,
message,
trailer,
};
},
});
},
async stream<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
input: AsyncIterable<I>
): Promise<StreamResponse<I, O>> {
const { serialize, parse } = createClientMethodSerializers(
method,
useBinaryFormat,
options.jsonOptions,
options.binaryOptions
);
async function* parseResponseBody(
body: ReadableStream<Uint8Array>,
foundStatus: boolean,
trailerTarget: Headers
) {
const reader = createEnvelopeReadableStream(body).getReader();
if (foundStatus) {
// A grpc-status: 0 response header was present. This is a "trailers-only"
// response (a response without a body and no trailers).
//
// The spec seems to disallow a trailers-only response for status 0 - we are
// lenient and only verify that the body is empty.
//
// > [...] Trailers-Only is permitted for calls that produce an immediate error.
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
if (!(await reader.read()).done) {
throw "extra data for trailers-only";
}
return;
}
let trailerReceived = false;
for (;;) {
const result = await reader.read();
if (result.done) {
break;
}
const { flags, data } = result.value;
if ((flags & trailerFlag) === trailerFlag) {
if (trailerReceived) {
throw "extra trailer";
}
trailerReceived = true;
const trailer = trailerParse(data);
validateTrailer(trailer);
trailer.forEach((value, key) => trailerTarget.set(key, value));
continue;
}
if (trailerReceived) {
throw "extra message";
}
yield parse(data);
continue;
}
if (!trailerReceived) {
throw "missing trailer";
}
}
async function createRequestBody(
input: AsyncIterable<I>
): Promise<Uint8Array> {
if (method.kind != MethodKind.ServerStreaming) {
throw "The fetch API does not support streaming request bodies";
}
const r = await input[Symbol.asyncIterator]().next();
if (r.done == true) {
throw "missing request message";
}
return encodeEnvelope(0, serialize(r.value));
}
return runStreamingCall<I, O>({
interceptors: options.interceptors,
signal,
timeoutMs,
req: {
stream: true,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
header: requestHeader(useBinaryFormat, timeoutMs, header),
message: input,
},
next: async (req) => {
const fRes = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body: await createRequestBody(req.message),
});
const { foundStatus } = validateResponse(fRes.status, fRes.headers);
if (!fRes.body) {
throw "missing response body";
}
const trailer = new Headers();
const res: StreamResponse<I, O> = {
...req,
header: fRes.headers,
trailer,
message: parseResponseBody(fRes.body, foundStatus, trailer),
};
return res;
},
});
},
};
}