in packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts [225:343]
private async _startSubscriptionWithAWSAppSyncRealTime({
options,
observer,
subscriptionId
}) {
const {
appSyncGraphqlEndpoint,
authenticationType,
query,
variables,
apiKey,
region,
graphql_headers = () => ({}),
credentials,
token
} = options;
const subscriptionState: SUBSCRIPTION_STATUS = SUBSCRIPTION_STATUS.PENDING;
const data = {
query,
variables
};
// Having a subscription id map will make it simple to forward messages received
this.subscriptionObserverMap.set(subscriptionId, {
observer,
query,
variables,
subscriptionState,
startAckTimeoutId: null,
});
// Preparing payload for subscription message
const dataString = JSON.stringify(data);
const headerObj = {
...(await this._awsRealTimeHeaderBasedAuth({
apiKey,
appSyncGraphqlEndpoint,
authenticationType,
payload: dataString,
canonicalUri: "",
region,
credentials,
token,
graphql_headers
})),
[USER_AGENT_HEADER]: USER_AGENT
};
const subscriptionMessage = {
id: subscriptionId,
payload: {
data: dataString,
extensions: {
authorization: {
...headerObj
}
}
},
type: MESSAGE_TYPES.GQL_START
};
const stringToAWSRealTime = JSON.stringify(subscriptionMessage);
try {
await this._initializeWebSocketConnection({
apiKey,
appSyncGraphqlEndpoint,
authenticationType,
region,
credentials,
token
});
} catch (err) {
const { message = "" } = err;
observer.error({
errors: [
{
...new GraphQLError(`Connection failed: ${message}`)
}
]
});
observer.complete();
const { subscriptionFailedCallback } =
this.subscriptionObserverMap.get(subscriptionId) || {};
// Notify concurrent unsubscription
if (typeof subscriptionFailedCallback === "function") {
subscriptionFailedCallback();
}
return;
}
// There could be a race condition when unsubscribe gets called during _initializeWebSocketConnection
// For example if unsubscribe gets invoked before it finishes WebSocket handshake or START_ACK
// subscriptionFailedCallback subscriptionReadyCallback are used to synchonized that
const {
subscriptionFailedCallback,
subscriptionReadyCallback
} = this.subscriptionObserverMap.get(subscriptionId);
// This must be done before sending the message in order to be listening immediately
this.subscriptionObserverMap.set(subscriptionId, {
observer,
subscriptionState,
variables,
query,
subscriptionReadyCallback,
subscriptionFailedCallback,
startAckTimeoutId: (setTimeout(() => {
this._timeoutStartSubscriptionAck.call(this, subscriptionId);
}, START_ACK_TIMEOUT) as unknown) as number
});
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.send(stringToAWSRealTime);
}
}