in sdk/core/ts-http-runtime/src/nodeHttpClient.ts [87:211]
public async sendRequest(request: PipelineRequest): Promise<PipelineResponse> {
const abortController = new AbortController();
let abortListener: ((event: any) => void) | undefined;
if (request.abortSignal) {
if (request.abortSignal.aborted) {
throw new AbortError("The operation was aborted. Request has already been canceled.");
}
abortListener = (event: Event) => {
if (event.type === "abort") {
abortController.abort();
}
};
request.abortSignal.addEventListener("abort", abortListener);
}
let timeoutId: ReturnType<typeof setTimeout> | undefined;
if (request.timeout > 0) {
timeoutId = setTimeout(() => {
const sanitizer = new Sanitizer();
logger.info(`request to '${sanitizer.sanitizeUrl(request.url)}' timed out. canceling...`);
abortController.abort();
}, request.timeout);
}
const acceptEncoding = request.headers.get("Accept-Encoding");
const shouldDecompress =
acceptEncoding?.includes("gzip") || acceptEncoding?.includes("deflate");
let body = typeof request.body === "function" ? request.body() : request.body;
if (body && !request.headers.has("Content-Length")) {
const bodyLength = getBodyLength(body);
if (bodyLength !== null) {
request.headers.set("Content-Length", bodyLength);
}
}
let responseStream: NodeJS.ReadableStream | undefined;
try {
if (body && request.onUploadProgress) {
const onUploadProgress = request.onUploadProgress;
const uploadReportStream = new ReportTransform(onUploadProgress);
uploadReportStream.on("error", (e) => {
logger.error("Error in upload progress", e);
});
if (isReadableStream(body)) {
body.pipe(uploadReportStream);
} else {
uploadReportStream.end(body);
}
body = uploadReportStream;
}
const res = await this.makeRequest(request, abortController, body);
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
const headers = getResponseHeaders(res);
const status = res.statusCode ?? 0;
const response: PipelineResponse = {
status,
headers,
request,
};
// Responses to HEAD must not have a body.
// If they do return a body, that body must be ignored.
if (request.method === "HEAD") {
// call resume() and not destroy() to avoid closing the socket
// and losing keep alive
res.resume();
return response;
}
responseStream = shouldDecompress ? getDecodedResponseStream(res, headers) : res;
const onDownloadProgress = request.onDownloadProgress;
if (onDownloadProgress) {
const downloadReportStream = new ReportTransform(onDownloadProgress);
downloadReportStream.on("error", (e) => {
logger.error("Error in download progress", e);
});
responseStream.pipe(downloadReportStream);
responseStream = downloadReportStream;
}
if (
// Value of POSITIVE_INFINITY in streamResponseStatusCodes is considered as any status code
request.streamResponseStatusCodes?.has(Number.POSITIVE_INFINITY) ||
request.streamResponseStatusCodes?.has(response.status)
) {
response.readableStreamBody = responseStream;
} else {
response.bodyAsText = await streamToText(responseStream);
}
return response;
} finally {
// clean up event listener
if (request.abortSignal && abortListener) {
let uploadStreamDone = Promise.resolve();
if (isReadableStream(body)) {
uploadStreamDone = isStreamComplete(body);
}
let downloadStreamDone = Promise.resolve();
if (isReadableStream(responseStream)) {
downloadStreamDone = isStreamComplete(responseStream);
}
Promise.all([uploadStreamDone, downloadStreamDone])
.then(() => {
// eslint-disable-next-line promise/always-return
if (abortListener) {
request.abortSignal?.removeEventListener("abort", abortListener);
}
})
.catch((e) => {
logger.warning("Error when cleaning up abortListener on httpRequest", e);
});
}
}
}