request()

in packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts [76:150]


  request(operation: Operation) {
    const { query, variables } = operation;
    const {
      controlMessages: { [CONTROL_EVENTS_KEY]: controlEvents } = {
        [CONTROL_EVENTS_KEY]: undefined
      },
      headers
    } = operation.getContext();
    return new Observable<FetchResult>(observer => {
      if (!this.url) {
        observer.error({
          errors: [
            {
              ...new GraphQLError(
                `Subscribe only available for AWS AppSync endpoint`
              ),
            },
          ],
        });
        observer.complete();
      } else {
        const subscriptionId = uuid();
        let token = this.auth.type === AUTH_TYPE.AMAZON_COGNITO_USER_POOLS ||
          this.auth.type === AUTH_TYPE.OPENID_CONNECT
          ? this.auth.jwtToken
          : null;

        token = this.auth.type === AUTH_TYPE.AWS_LAMBDA ? this.auth.token : token;

        const options = {
          appSyncGraphqlEndpoint: this.url,
          authenticationType: this.auth.type,
          query: print(query),
          region: this.region,
          graphql_headers: () => (headers),
          variables,
          apiKey: this.auth.type === AUTH_TYPE.API_KEY ? this.auth.apiKey : "",
          credentials:
            this.auth.type === AUTH_TYPE.AWS_IAM ? this.auth.credentials : null,
          token
        };

        this._startSubscriptionWithAWSAppSyncRealTime({
          options,
          observer,
          subscriptionId
        });

        return async () => {
          // Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime
          try {
            this._verifySubscriptionAlreadyStarted(subscriptionId);
            const { subscriptionState } = this.subscriptionObserverMap.get(
              subscriptionId
            );
            if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) {
              this._sendUnsubscriptionMessage(subscriptionId);
            } else {
              throw new Error(
                "Subscription has failed, starting to remove subscription."
              );
            }
          } catch (err) {
            this._removeSubscriptionObserver(subscriptionId);
            return;
          }
        };
      }
    }).filter(data => {
      const { extensions: { controlMsgType = undefined } = {} } = data;
      const isControlMsg = typeof controlMsgType !== "undefined";

      return controlEvents === true || !isControlMsg;
    });
  }