in sdk/server-proxies/src/tunnels/TunnelConnection.ts [88:154]
public async invokeAsync(httpRequest: HttpRequestLike, abortSignal?: AbortSignalLike, consistentKey?: string): Promise<HttpResponseLike> {
const client = this.getClient(consistentKey);
if (!client) {
throw new Error("No connection available.");
}
const ackId = this.nextAckId();
const pcs = new PromiseCompletionSource<HttpResponseLike>();
let firstResponse = false;
let ackMap = this._ackMap;
let body = new AsyncIterator<Uint8Array>();
let ackEntity = new AckEntity<TunnelHttpResponseMessage>(
ackId,
(data: TunnelHttpResponseMessage | undefined, error: string | null, done: boolean) => {
// handle body
if (error) {
body.error(error);
}
if (data!.Content) {
body.add(data!.Content);
}
if (done) {
body.close();
}
if (!firstResponse) {
firstResponse = true;
if (consistentKey) {
this.releaseClient(consistentKey, client.id);
}
pcs.resolve({
statusCode: data!.StatusCode,
body: body,
} as HttpResponseLike);
}
},
abortSignal
);
ackMap.set(ackId, ackEntity);
try {
if (consistentKey) {
this.lockClient(consistentKey, client);
}
let headers = {};
if (httpRequest.contentType) {
headers = { "Content-Type": [httpRequest.contentType] };
}
await client.sendAsync(
new TunnelHttpRequestMessage(ackId, true, "", httpRequest.method, httpRequest.url, headers, httpRequest.content),
abortSignal
);
logger.info(`Sent http request: ackId: ${ackId}, method: ${httpRequest.method}, url: ${httpRequest.url}, hub: ${this.hub}`);
// Wait for the first response which contains status code / headers
return pcs.promise;
} catch (err) {
this._ackMap.delete(ackId);
if (consistentKey) {
this.releaseClient(consistentKey, client.id);
}
throw err;
}
}