private async processMessage()

in sdk/server-proxies/src/tunnels/TunnelConnection.ts [231:313]


  private async processMessage(client: WebPubSubTunnelClient, message: TunnelMessage, abortSignal?: AbortSignalLike): Promise<void> {
    try {
      switch (message.Type) {
        case TunnelMessageType.HttpResponse: {
          const tunnelResponse = message as TunnelHttpResponseMessage;
          logger.info(`Getting http response ${tunnelResponse.TracingId ?? ""}: ackId: ${tunnelResponse.AckId}, statusCode: ${tunnelResponse.StatusCode}, notComplete: ${tunnelResponse.NotCompleted}, hub: ${this.hub}`);
          const ackId = tunnelResponse.AckId;
          if (this._ackMap.has(ackId)) {
            const entity = this._ackMap.get(ackId)!;
            entity.write(tunnelResponse, null, !tunnelResponse.NotCompleted);
            if (!tunnelResponse.NotCompleted) {
              this._ackMap.delete(ackId);
            }
          }
          break;
        }
        case TunnelMessageType.HttpRequest: {
          const tunnelRequest = message as TunnelHttpRequestMessage;
          logger.info(`Getting http request ${tunnelRequest.TracingId ?? ""}: ackId: ${tunnelRequest.AckId}, method: ${tunnelRequest.HttpMethod}, url: ${tunnelRequest.Url}, hub: ${this.hub}`);
          if (!this.requestHandler) {
            throw new Error("Request handler not configured");
          }
  
          const response = await this.requestHandler(tunnelRequest, abortSignal);
          if (response) {
            logger.info(`Sending response back ${tunnelRequest.TracingId ?? ""}: ackId:${tunnelRequest.AckId}, statusCode: ${response.StatusCode}, content-length: ${response.Content.length}, hub: ${this.hub}`);
            await client.sendAsync(
              new TunnelHttpResponseMessage(
                tunnelRequest.AckId,
                tunnelRequest.LocalRouting,
                response.StatusCode,
                tunnelRequest.ChannelName,
                false,
                response.Headers,
                response.Content
              ),
              abortSignal
            );
          }
          break;
        }
        case TunnelMessageType.ConnectionReconnect: {
          const reconnect = message as TunnelConnectionReconnectMessage;
          logger.info(`Reconnect the connection ${client.getPrintableIdentifier()}: ${reconnect.Message}, hub: ${this.hub}`);
          await this.stopConnection(client.id);
          await this.startConnectionAsync(
            {
              endpoint: reconnect.Endpoint,
              target: reconnect.TargetId,
            },
            ()=>true, // keep it retry forever
            abortSignal
          );
          break;
        }
        case TunnelMessageType.ConnectionClose: {
          const close = message as TunnelConnectionCloseMessage;
          logger.info(`Close the connection ${client.getPrintableIdentifier()}: ${close.Message}, hub: ${this.hub}`);
          this.stopConnection(client.id);
          break;
        }
        case TunnelMessageType.ConnectionRebalance: {
          const rebalance = message as TunnelConnectionRebalanceMessage;
          logger.info(`Start another rebalance connection ${rebalance.Endpoint} -> ${rebalance.TargetId}, via connection: ${client.getPrintableIdentifier()}, hub: ${this.hub}`);
          await this.startConnectionAsync(
            {
              endpoint: rebalance.Endpoint,
              target: rebalance.TargetId,
            },
            () => true, // retry forever to be consistent with current logic
            abortSignal
          );
          break;
        }
        default: {
          logger.info(`[TunnelConnection] Not Support TBD message type: ${message.Type}, hub: ${this.hub}`);
          break;
        }
      }
    } catch (err) {
      logger.warning(`Error processing message: ${err}, hub: ${this.hub}`); 
    }
  }