in sdk/server-proxies/src/tunnels/TunnelConnection.ts [231:313]
private async processMessage(client: WebPubSubTunnelClient, message: TunnelMessage, abortSignal?: AbortSignalLike): Promise<void> {
try {
switch (message.Type) {
case TunnelMessageType.HttpResponse: {
const tunnelResponse = message as TunnelHttpResponseMessage;
logger.info(`Getting http response ${tunnelResponse.TracingId ?? ""}: ackId: ${tunnelResponse.AckId}, statusCode: ${tunnelResponse.StatusCode}, notComplete: ${tunnelResponse.NotCompleted}, hub: ${this.hub}`);
const ackId = tunnelResponse.AckId;
if (this._ackMap.has(ackId)) {
const entity = this._ackMap.get(ackId)!;
entity.write(tunnelResponse, null, !tunnelResponse.NotCompleted);
if (!tunnelResponse.NotCompleted) {
this._ackMap.delete(ackId);
}
}
break;
}
case TunnelMessageType.HttpRequest: {
const tunnelRequest = message as TunnelHttpRequestMessage;
logger.info(`Getting http request ${tunnelRequest.TracingId ?? ""}: ackId: ${tunnelRequest.AckId}, method: ${tunnelRequest.HttpMethod}, url: ${tunnelRequest.Url}, hub: ${this.hub}`);
if (!this.requestHandler) {
throw new Error("Request handler not configured");
}
const response = await this.requestHandler(tunnelRequest, abortSignal);
if (response) {
logger.info(`Sending response back ${tunnelRequest.TracingId ?? ""}: ackId:${tunnelRequest.AckId}, statusCode: ${response.StatusCode}, content-length: ${response.Content.length}, hub: ${this.hub}`);
await client.sendAsync(
new TunnelHttpResponseMessage(
tunnelRequest.AckId,
tunnelRequest.LocalRouting,
response.StatusCode,
tunnelRequest.ChannelName,
false,
response.Headers,
response.Content
),
abortSignal
);
}
break;
}
case TunnelMessageType.ConnectionReconnect: {
const reconnect = message as TunnelConnectionReconnectMessage;
logger.info(`Reconnect the connection ${client.getPrintableIdentifier()}: ${reconnect.Message}, hub: ${this.hub}`);
await this.stopConnection(client.id);
await this.startConnectionAsync(
{
endpoint: reconnect.Endpoint,
target: reconnect.TargetId,
},
()=>true, // keep it retry forever
abortSignal
);
break;
}
case TunnelMessageType.ConnectionClose: {
const close = message as TunnelConnectionCloseMessage;
logger.info(`Close the connection ${client.getPrintableIdentifier()}: ${close.Message}, hub: ${this.hub}`);
this.stopConnection(client.id);
break;
}
case TunnelMessageType.ConnectionRebalance: {
const rebalance = message as TunnelConnectionRebalanceMessage;
logger.info(`Start another rebalance connection ${rebalance.Endpoint} -> ${rebalance.TargetId}, via connection: ${client.getPrintableIdentifier()}, hub: ${this.hub}`);
await this.startConnectionAsync(
{
endpoint: rebalance.Endpoint,
target: rebalance.TargetId,
},
() => true, // retry forever to be consistent with current logic
abortSignal
);
break;
}
default: {
logger.info(`[TunnelConnection] Not Support TBD message type: ${message.Type}, hub: ${this.hub}`);
break;
}
}
} catch (err) {
logger.warning(`Error processing message: ${err}, hub: ${this.hub}`);
}
}