in packages/dubbo-web/src/dubbo-transport.ts [122:328]
export function createConnectTransport(
options: ConnectTransportOptions
): Transport {
assertFetchApi();
const useBinaryFormat = options.useBinaryFormat ?? false;
const fetch = options.fetch ?? globalThis.fetch;
return {
async unary<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
message: PartialMessage<I>
): Promise<UnaryResponse<I, O>> {
const { normalize, serialize, parse } = createClientMethodSerializers(
method,
useBinaryFormat,
options.jsonOptions,
options.binaryOptions
);
return await runUnaryCall<I, O>({
interceptors: options.interceptors,
signal,
timeoutMs,
req: {
stream: false,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
header: requestHeader(
method.kind,
useBinaryFormat,
timeoutMs,
header
),
message: normalize(message),
},
next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
const useGet =
options.useHttpGet === true &&
method.idempotency === MethodIdempotency.NoSideEffects;
let body: BodyInit | null = null;
if (useGet) {
req = transformConnectPostToGetRequest(
req,
serialize(req.message),
useBinaryFormat
);
} else {
body = serialize(req.message);
}
const response = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body,
});
const { isUnaryError, unaryError } = validateResponse(
method.kind,
response.status,
response.headers
);
if (isUnaryError) {
throw errorFromJson(
(await response.json()) as JsonValue,
appendHeaders(...trailerDemux(response.headers)),
unaryError
);
}
const [demuxedHeader, demuxedTrailer] = trailerDemux(
response.headers
);
return <UnaryResponse<I, O>>{
stream: false,
service,
method,
header: demuxedHeader,
message: useBinaryFormat
? parse(new Uint8Array(await response.arrayBuffer()))
: method.O.fromJson(
(await response.json()) as JsonValue,
getJsonOptions(options.jsonOptions)
),
trailer: demuxedTrailer,
};
},
});
},
async stream<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
input: AsyncIterable<I>
): Promise<StreamResponse<I, O>> {
const { serialize, parse } = createClientMethodSerializers(
method,
useBinaryFormat,
options.jsonOptions,
options.binaryOptions
);
async function* parseResponseBody(
body: ReadableStream<Uint8Array>,
trailerTarget: Headers
) {
const reader = createEnvelopeReadableStream(body).getReader();
let endStreamReceived = false;
for (;;) {
const result = await reader.read();
if (result.done) {
break;
}
const { flags, data } = result.value;
if ((flags & endStreamFlag) === endStreamFlag) {
endStreamReceived = true;
const endStream = endStreamFromJson(data);
if (endStream.error) {
throw endStream.error;
}
endStream.metadata.forEach((value, key) =>
trailerTarget.set(key, value)
);
continue;
}
yield parse(data);
}
if (!endStreamReceived) {
throw "missing EndStreamResponse";
}
}
async function createRequestBody(
input: AsyncIterable<I>
): Promise<Uint8Array> {
if (method.kind != MethodKind.ServerStreaming) {
throw "The fetch API does not support streaming request bodies";
}
const r = await input[Symbol.asyncIterator]().next();
if (r.done == true) {
throw "missing request message";
}
return encodeEnvelope(0, serialize(r.value));
}
return await runStreamingCall<I, O>({
interceptors: options.interceptors,
timeoutMs,
signal,
req: {
stream: true,
service,
method,
url: createMethodUrl(options.baseUrl, service, method),
init: {
method: "POST",
credentials: options.credentials ?? "same-origin",
redirect: "error",
mode: "cors",
},
header: requestHeader(
method.kind,
useBinaryFormat,
timeoutMs,
header
),
message: input,
},
next: async (req) => {
const fRes = await fetch(req.url, {
...req.init,
headers: req.header,
signal: req.signal,
body: await createRequestBody(req.message),
});
validateResponse(method.kind, fRes.status, fRes.headers);
if (fRes.body === null) {
throw "missing response body";
}
const trailer = new Headers();
const res: StreamResponse<I, O> = {
...req,
header: fRes.headers,
trailer,
message: parseResponseBody(fRes.body, trailer),
};
return res;
},
});
},
};
}