in packages/dubbo/src/protocol-triple/transport.ts [101:173]
next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
let requestBody = serialization
.getI(opt.useBinaryFormat)
.serialize(req.message);
if (
opt.sendCompression &&
requestBody.byteLength > opt.compressMinBytes
) {
requestBody = await opt.sendCompression.compress(requestBody);
req.header.set(headerUnaryEncoding, opt.sendCompression.name);
} else {
req.header.delete(headerUnaryEncoding);
}
const useGet =
opt.useHttpGet === true &&
method.idempotency === MethodIdempotency.NoSideEffects;
let body: AsyncIterable<Uint8Array> | undefined;
if (useGet) {
req = transformConnectPostToGetRequest(
req,
requestBody,
opt.useBinaryFormat
);
} else {
body = createAsyncIterable([requestBody]);
}
const universalResponse = await opt.httpClient({
url: req.url,
method: req.init.method ?? "POST",
header: req.header,
signal: req.signal,
body,
});
const { compression, isUnaryError, unaryError } =
validateResponseWithCompression(
method.kind,
opt.acceptCompression,
universalResponse.status,
universalResponse.header
);
const [header, trailer] = trailerDemux(universalResponse.header);
let responseBody = await pipeTo(
universalResponse.body,
sinkAllBytes(
opt.readMaxBytes,
universalResponse.header.get(headerUnaryContentLength)
),
{ propagateDownStreamError: false }
);
if (compression) {
responseBody = await compression.decompress(
responseBody,
opt.readMaxBytes
);
}
if (isUnaryError) {
throw errorFromJsonBytes(
responseBody,
appendHeaders(header, trailer),
unaryError
);
}
return <UnaryResponse<I, O>>{
stream: false,
service,
method,
header,
message: serialization
.getO(opt.useBinaryFormat)
.parse(responseBody),
trailer,
};
},