private _handleIncomingSubscriptionMessage()

in packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts [647:736]


  private _handleIncomingSubscriptionMessage(message: MessageEvent) {
    logger(`subscription message from AWS AppSync RealTime: ${message.data}`);
    const { id = "", payload, type } = JSON.parse(message.data);
    const {
      observer = null,
      query = "",
      variables = {},
      startAckTimeoutId = 0,
      subscriptionReadyCallback = null,
      subscriptionFailedCallback = null
    } = this.subscriptionObserverMap.get(id) || {};

    logger({ id, observer, query, variables });

    if (type === MESSAGE_TYPES.GQL_DATA && payload && payload.data) {
      if (observer) {
        observer.next(payload);
      } else {
        logger(`observer not found for id: ${id}`);
      }
      return;
    }

    if (type === MESSAGE_TYPES.GQL_START_ACK) {
      logger(`subscription ready for ${JSON.stringify({ query, variables })}`);
      if (typeof subscriptionReadyCallback === "function") {
        subscriptionReadyCallback();
      }
      clearTimeout(startAckTimeoutId as number);
      if (observer) {
        observer.next({
          data: payload,
          extensions: {
            controlMsgType: "CONNECTED"
          }
        });
      } else {
        logger(`observer not found for id: ${id}`);
      }

      const subscriptionState = SUBSCRIPTION_STATUS.CONNECTED;
      this.subscriptionObserverMap.set(id, {
        observer,
        query,
        variables,
        startAckTimeoutId: null,
        subscriptionState,
        subscriptionReadyCallback,
        subscriptionFailedCallback
      });

      return;
    }

    if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
      clearTimeout(this.keepAliveTimeoutId);
      this.keepAliveTimeoutId = setTimeout(
        this._errorDisconnect.bind(this, CONTROL_MSG.TIMEOUT_DISCONNECT),
        this.keepAliveTimeout
      );
      return;
    }

    if (type === MESSAGE_TYPES.GQL_ERROR) {
      const subscriptionState = SUBSCRIPTION_STATUS.FAILED;
      this.subscriptionObserverMap.set(id, {
        observer,
        query,
        variables,
        startAckTimeoutId,
        subscriptionReadyCallback,
        subscriptionFailedCallback,
        subscriptionState
      });

      observer.error({
        errors: [
          {
            ...new GraphQLError(`Connection failed: ${JSON.stringify(payload)}`)
          }
        ]
      });
      clearTimeout(startAckTimeoutId);

      observer.complete();
      if (typeof subscriptionFailedCallback === "function") {
        subscriptionFailedCallback();
      }
    }
  }