private async _startSubscriptionWithAWSAppSyncRealTime()

in packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts [225:343]


  private async _startSubscriptionWithAWSAppSyncRealTime({
    options,
    observer,
    subscriptionId
  }) {
    const {
      appSyncGraphqlEndpoint,
      authenticationType,
      query,
      variables,
      apiKey,
      region,
      graphql_headers = () => ({}),
      credentials,
      token
    } = options;
    const subscriptionState: SUBSCRIPTION_STATUS = SUBSCRIPTION_STATUS.PENDING;
    const data = {
      query,
      variables
    };
    // Having a subscription id map will make it simple to forward messages received
    this.subscriptionObserverMap.set(subscriptionId, {
      observer,
      query,
      variables,
      subscriptionState,
      startAckTimeoutId: null,
    });

    // Preparing payload for subscription message

    const dataString = JSON.stringify(data);
    const headerObj = {
      ...(await this._awsRealTimeHeaderBasedAuth({
        apiKey,
        appSyncGraphqlEndpoint,
        authenticationType,
        payload: dataString,
        canonicalUri: "",
        region,
        credentials,
        token,
        graphql_headers
      })),
      [USER_AGENT_HEADER]: USER_AGENT
    };

    const subscriptionMessage = {
      id: subscriptionId,
      payload: {
        data: dataString,
        extensions: {
          authorization: {
            ...headerObj
          }
        }
      },
      type: MESSAGE_TYPES.GQL_START
    };

    const stringToAWSRealTime = JSON.stringify(subscriptionMessage);

    try {
      await this._initializeWebSocketConnection({
        apiKey,
        appSyncGraphqlEndpoint,
        authenticationType,
        region,
        credentials,
        token
      });
    } catch (err) {
      const { message = "" } = err;
      observer.error({
        errors: [
          {
            ...new GraphQLError(`Connection failed: ${message}`)
          }
        ]
      });
      observer.complete();

      const { subscriptionFailedCallback } =
        this.subscriptionObserverMap.get(subscriptionId) || {};

      // Notify concurrent unsubscription
      if (typeof subscriptionFailedCallback === "function") {
        subscriptionFailedCallback();
      }
      return;
    }

    // There could be a race condition when unsubscribe gets called during _initializeWebSocketConnection
    // For example if unsubscribe gets invoked before it finishes WebSocket handshake or START_ACK
    // subscriptionFailedCallback subscriptionReadyCallback are used to synchonized that

    const {
      subscriptionFailedCallback,
      subscriptionReadyCallback
    } = this.subscriptionObserverMap.get(subscriptionId);

    // This must be done before sending the message in order to be listening immediately
    this.subscriptionObserverMap.set(subscriptionId, {
      observer,
      subscriptionState,
      variables,
      query,
      subscriptionReadyCallback,
      subscriptionFailedCallback,
      startAckTimeoutId: (setTimeout(() => {
        this._timeoutStartSubscriptionAck.call(this, subscriptionId);
      }, START_ACK_TIMEOUT) as unknown) as number
    });

    if (this.awsRealTimeSocket) {
      this.awsRealTimeSocket.send(stringToAWSRealTime);
    }
  }