in packages/dubbo/src/protocol-triple/transport.ts [61:307]
export function createTransport(opt: CommonTransportOptions): Transport {
return {
async unary<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
message: PartialMessage<I>
): Promise<UnaryResponse<I, O>> {
const serialization = createMethodSerializationLookup(
method,
opt.binaryOptions,
opt.jsonOptions,
opt
);
return await runUnaryCall<I, O>({
interceptors: opt.interceptors,
signal,
timeoutMs,
req: {
stream: false,
service,
method,
url: createMethodUrl(opt.baseUrl, service, method),
init: {},
header: requestHeaderWithCompression(
method.kind,
opt.useBinaryFormat,
timeoutMs,
header,
opt.acceptCompression,
opt.sendCompression
),
message:
message instanceof method.I ? message : new method.I(message),
},
next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
let requestBody = serialization
.getI(opt.useBinaryFormat)
.serialize(req.message);
if (
opt.sendCompression &&
requestBody.byteLength > opt.compressMinBytes
) {
requestBody = await opt.sendCompression.compress(requestBody);
req.header.set(headerUnaryEncoding, opt.sendCompression.name);
} else {
req.header.delete(headerUnaryEncoding);
}
const useGet =
opt.useHttpGet === true &&
method.idempotency === MethodIdempotency.NoSideEffects;
let body: AsyncIterable<Uint8Array> | undefined;
if (useGet) {
req = transformConnectPostToGetRequest(
req,
requestBody,
opt.useBinaryFormat
);
} else {
body = createAsyncIterable([requestBody]);
}
const universalResponse = await opt.httpClient({
url: req.url,
method: req.init.method ?? "POST",
header: req.header,
signal: req.signal,
body,
});
const { compression, isUnaryError, unaryError } =
validateResponseWithCompression(
method.kind,
opt.acceptCompression,
universalResponse.status,
universalResponse.header
);
const [header, trailer] = trailerDemux(universalResponse.header);
let responseBody = await pipeTo(
universalResponse.body,
sinkAllBytes(
opt.readMaxBytes,
universalResponse.header.get(headerUnaryContentLength)
),
{ propagateDownStreamError: false }
);
if (compression) {
responseBody = await compression.decompress(
responseBody,
opt.readMaxBytes
);
}
if (isUnaryError) {
throw errorFromJsonBytes(
responseBody,
appendHeaders(header, trailer),
unaryError
);
}
return <UnaryResponse<I, O>>{
stream: false,
service,
method,
header,
message: serialization
.getO(opt.useBinaryFormat)
.parse(responseBody),
trailer,
};
},
});
},
async stream<
I extends Message<I> = AnyMessage,
O extends Message<O> = AnyMessage
>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
timeoutMs: number | undefined,
header: HeadersInit | undefined,
input: AsyncIterable<I>
): Promise<StreamResponse<I, O>> {
const serialization = createMethodSerializationLookup(
method,
opt.binaryOptions,
opt.jsonOptions,
opt
);
const endStreamSerialization = createEndStreamSerialization(
opt.jsonOptions
);
return runStreamingCall<I, O>({
interceptors: opt.interceptors,
signal,
timeoutMs,
req: {
stream: true,
service,
method,
url: createMethodUrl(opt.baseUrl, service, method),
init: {
method: "POST",
redirect: "error",
mode: "cors",
},
header: requestHeaderWithCompression(
method.kind,
opt.useBinaryFormat,
timeoutMs,
header,
opt.acceptCompression,
opt.sendCompression
),
message: pipe(input, transformNormalizeMessage(method.I), {
propagateDownStreamError: true,
}),
},
next: async (req: StreamRequest<I, O>) => {
const uRes = await opt.httpClient({
url: req.url,
method: "POST",
header: req.header,
signal: req.signal,
body: pipe(
req.message,
transformNormalizeMessage(method.I),
transformSerializeEnvelope(
serialization.getI(opt.useBinaryFormat)
),
transformCompressEnvelope(
opt.sendCompression,
opt.compressMinBytes
),
transformJoinEnvelopes(),
{ propagateDownStreamError: true }
),
});
const { compression } = validateResponseWithCompression(
method.kind,
opt.acceptCompression,
uRes.status,
uRes.header
);
const res: StreamResponse<I, O> = {
...req,
header: uRes.header,
trailer: new Headers(),
message: pipe(
uRes.body,
transformSplitEnvelope(opt.readMaxBytes),
transformDecompressEnvelope(
compression ?? null,
opt.readMaxBytes
),
transformParseEnvelope(
serialization.getO(opt.useBinaryFormat),
endStreamFlag,
endStreamSerialization
),
async function* (iterable) {
let endStreamReceived = false;
for await (const chunk of iterable) {
if (chunk.end) {
if (endStreamReceived) {
throw new ConnectError(
"protocol error: received extra EndStreamResponse",
Code.InvalidArgument
);
}
endStreamReceived = true;
if (chunk.value.error) {
throw chunk.value.error;
}
chunk.value.metadata.forEach((value, key) =>
res.trailer.set(key, value)
);
continue;
}
if (endStreamReceived) {
throw new ConnectError(
"protocol error: received extra message after EndStreamResponse",
Code.InvalidArgument
);
}
yield chunk.value;
}
if (!endStreamReceived) {
throw new ConnectError(
"protocol error: missing EndStreamResponse",
Code.InvalidArgument
);
}
},
{ propagateDownStreamError: true }
),
};
return res;
},
});
},
};
}