private _connectCore()

in sdk/web-pubsub/web-pubsub-client/src/webPubSubClient.ts [563:770]


  private _connectCore(uri: string): Promise<void> {
    if (this._isStopping) {
      throw new Error("Can't start a client during stopping");
    }

    return new Promise<void>((resolve, reject) => {
      // This part is executed sync
      const client = (this._wsClient = this._getWebSocketClientFactory().create(
        uri,
        this._protocol.name,
      ));
      client.onopen(() => {
        // There's a case that client called stop() before this method. We need to check and close it if it's the case.
        if (this._isStopping) {
          try {
            client.close();
          } catch {
            /** empty */
          }

          reject(new Error(`The client is stopped`));
        }
        logger.verbose("WebSocket connection has opened");
        this._changeState(WebPubSubClientState.Connected);
        if (this._protocol.isReliableSubProtocol) {
          if (this._sequenceAckTask != null) {
            this._sequenceAckTask.abort();
          }
          this._sequenceAckTask = new AbortableTask(async () => {
            await this._trySendSequenceAck();
          }, 1000);
        }

        resolve();
      });

      client.onerror((e) => {
        if (this._sequenceAckTask != null) {
          this._sequenceAckTask.abort();
        }
        reject(e);
      });

      client.onclose((code: number, reason: string) => {
        if (this._state === WebPubSubClientState.Connected) {
          logger.verbose("WebSocket closed after open");
          if (this._sequenceAckTask != null) {
            this._sequenceAckTask.abort();
          }
          logger.info(`WebSocket connection closed. Code: ${code}, Reason: ${reason}`);
          this._lastCloseEvent = { code: code, reason: reason };
          this._handleConnectionClose.call(this);
        } else {
          logger.verbose("WebSocket closed before open");
          reject(new Error(`Failed to start WebSocket: ${code}`));
        }
      });

      client.onmessage((data: any) => {
        const handleAckMessage = (message: AckMessage): void => {
          if (this._ackMap.has(message.ackId)) {
            const entity = this._ackMap.get(message.ackId)!;
            this._ackMap.delete(message.ackId);
            const isDuplicated: boolean =
              message.error != null && message.error.name === "Duplicate";
            if (message.success || isDuplicated) {
              entity.resolve({
                ackId: message.ackId,
                isDuplicated: isDuplicated,
              } as WebPubSubResult);
            } else {
              entity.reject(
                new SendMessageError("Failed to send message.", {
                  ackId: message.ackId,
                  errorDetail: message.error,
                } as SendMessageErrorOptions),
              );
            }
          }
        };

        const handleConnectedMessage = async (message: ConnectedMessage): Promise<void> => {
          this._connectionId = message.connectionId;
          this._reconnectionToken = message.reconnectionToken;

          if (!this._isInitialConnected) {
            this._isInitialConnected = true;

            if (this._options.autoRejoinGroups) {
              const groupPromises: Promise<void>[] = [];
              this._groupMap.forEach((g) => {
                if (g.isJoined) {
                  groupPromises.push(
                    (async () => {
                      try {
                        await this._joinGroupCore(g.name);
                      } catch (err) {
                        this._safeEmitRejoinGroupFailed(g.name, err);
                      }
                    })(),
                  );
                }
              });

              await Promise.all(groupPromises).catch(() => {
                /** empty */
              });
            }

            this._safeEmitConnected(message.connectionId, message.userId);
          }
        };

        const handleDisconnectedMessage = (message: DisconnectedMessage): void => {
          this._lastDisconnectedMessage = message;
        };

        const handleGroupDataMessage = (message: GroupDataMessage): void => {
          if (message.sequenceId != null) {
            const diff = this._sequenceId.tryUpdate(message.sequenceId);
            if (diff === 0) {
              // drop duplicated message
              return;
            }

            // If the diff is larger than the threshold, we must ack quicker to avoid slow client drop.
            if (diff > this._quickSequenceAckDiff) {
              this._trySendSequenceAck();
            }
          }

          this._safeEmitGroupMessage(message);
        };

        const handleServerDataMessage = (message: ServerDataMessage): void => {
          if (message.sequenceId != null) {
            const diff = this._sequenceId.tryUpdate(message.sequenceId);
            if (diff === 0) {
              // drop duplicated message
              return;
            }

            // If the diff is larger than the threshold, we must ack quicker to avoid slow client drop.
            if (diff > this._quickSequenceAckDiff) {
              this._trySendSequenceAck();
            }
          }

          this._safeEmitServerMessage(message);
        };

        let messages: WebPubSubMessage[] | WebPubSubMessage | null;
        try {
          let convertedData: Buffer | ArrayBuffer | string;
          if (Array.isArray(data)) {
            convertedData = Buffer.concat(data);
          } else {
            convertedData = data;
          }

          messages = this._protocol.parseMessages(convertedData);
          if (messages === null) {
            // null means the message is not recognized.
            return;
          }
        } catch (err) {
          logger.warning("An error occurred while parsing the message from service", err);
          throw err;
        }

        if (!Array.isArray(messages)) {
          messages = [messages];
        }

        messages.forEach((message) => {
          try {
            switch (message.kind) {
              case "ack": {
                handleAckMessage(message as AckMessage);
                break;
              }
              case "connected": {
                handleConnectedMessage(message as ConnectedMessage);
                break;
              }
              case "disconnected": {
                handleDisconnectedMessage(message as DisconnectedMessage);
                break;
              }
              case "groupData": {
                handleGroupDataMessage(message as GroupDataMessage);
                break;
              }
              case "serverData": {
                handleServerDataMessage(message as ServerDataMessage);
                break;
              }
            }
          } catch (err) {
            logger.warning(
              `An error occurred while handling the message with kind: ${message.kind} from service`,
              err,
            );
          }
        });
      });
    });
  }