in packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts [647:736]
private _handleIncomingSubscriptionMessage(message: MessageEvent) {
logger(`subscription message from AWS AppSync RealTime: ${message.data}`);
const { id = "", payload, type } = JSON.parse(message.data);
const {
observer = null,
query = "",
variables = {},
startAckTimeoutId = 0,
subscriptionReadyCallback = null,
subscriptionFailedCallback = null
} = this.subscriptionObserverMap.get(id) || {};
logger({ id, observer, query, variables });
if (type === MESSAGE_TYPES.GQL_DATA && payload && payload.data) {
if (observer) {
observer.next(payload);
} else {
logger(`observer not found for id: ${id}`);
}
return;
}
if (type === MESSAGE_TYPES.GQL_START_ACK) {
logger(`subscription ready for ${JSON.stringify({ query, variables })}`);
if (typeof subscriptionReadyCallback === "function") {
subscriptionReadyCallback();
}
clearTimeout(startAckTimeoutId as number);
if (observer) {
observer.next({
data: payload,
extensions: {
controlMsgType: "CONNECTED"
}
});
} else {
logger(`observer not found for id: ${id}`);
}
const subscriptionState = SUBSCRIPTION_STATUS.CONNECTED;
this.subscriptionObserverMap.set(id, {
observer,
query,
variables,
startAckTimeoutId: null,
subscriptionState,
subscriptionReadyCallback,
subscriptionFailedCallback
});
return;
}
if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
clearTimeout(this.keepAliveTimeoutId);
this.keepAliveTimeoutId = setTimeout(
this._errorDisconnect.bind(this, CONTROL_MSG.TIMEOUT_DISCONNECT),
this.keepAliveTimeout
);
return;
}
if (type === MESSAGE_TYPES.GQL_ERROR) {
const subscriptionState = SUBSCRIPTION_STATUS.FAILED;
this.subscriptionObserverMap.set(id, {
observer,
query,
variables,
startAckTimeoutId,
subscriptionReadyCallback,
subscriptionFailedCallback,
subscriptionState
});
observer.error({
errors: [
{
...new GraphQLError(`Connection failed: ${JSON.stringify(payload)}`)
}
]
});
clearTimeout(startAckTimeoutId);
observer.complete();
if (typeof subscriptionFailedCallback === "function") {
subscriptionFailedCallback();
}
}
}