in packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts [76:150]
request(operation: Operation) {
const { query, variables } = operation;
const {
controlMessages: { [CONTROL_EVENTS_KEY]: controlEvents } = {
[CONTROL_EVENTS_KEY]: undefined
},
headers
} = operation.getContext();
return new Observable<FetchResult>(observer => {
if (!this.url) {
observer.error({
errors: [
{
...new GraphQLError(
`Subscribe only available for AWS AppSync endpoint`
),
},
],
});
observer.complete();
} else {
const subscriptionId = uuid();
let token = this.auth.type === AUTH_TYPE.AMAZON_COGNITO_USER_POOLS ||
this.auth.type === AUTH_TYPE.OPENID_CONNECT
? this.auth.jwtToken
: null;
token = this.auth.type === AUTH_TYPE.AWS_LAMBDA ? this.auth.token : token;
const options = {
appSyncGraphqlEndpoint: this.url,
authenticationType: this.auth.type,
query: print(query),
region: this.region,
graphql_headers: () => (headers),
variables,
apiKey: this.auth.type === AUTH_TYPE.API_KEY ? this.auth.apiKey : "",
credentials:
this.auth.type === AUTH_TYPE.AWS_IAM ? this.auth.credentials : null,
token
};
this._startSubscriptionWithAWSAppSyncRealTime({
options,
observer,
subscriptionId
});
return async () => {
// Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime
try {
this._verifySubscriptionAlreadyStarted(subscriptionId);
const { subscriptionState } = this.subscriptionObserverMap.get(
subscriptionId
);
if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) {
this._sendUnsubscriptionMessage(subscriptionId);
} else {
throw new Error(
"Subscription has failed, starting to remove subscription."
);
}
} catch (err) {
this._removeSubscriptionObserver(subscriptionId);
return;
}
};
}
}).filter(data => {
const { extensions: { controlMsgType = undefined } = {} } = data;
const isControlMsg = typeof controlMsgType !== "undefined";
return controlEvents === true || !isControlMsg;
});
}