in packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts [541:645]
private async _initializeHandshake({ awsRealTimeUrl }) {
logger(`Initializing handshake ${awsRealTimeUrl}`);
// Because connecting the socket is async, is waiting until connection is open
// Step 1: connect websocket
try {
await (() => {
return new Promise<void>((res, rej) => {
const newSocket = AppSyncRealTimeSubscriptionHandshakeLink.createWebSocket(awsRealTimeUrl, "graphql-ws");
newSocket.onerror = () => {
logger(`WebSocket connection error`);
};
newSocket.onclose = () => {
rej(new Error("Connection handshake error"));
};
newSocket.onopen = () => {
this.awsRealTimeSocket = newSocket;
return res();
};
});
})();
// Step 2: wait for ack from AWS AppSyncReaTime after sending init
await (() => {
return new Promise((res, rej) => {
let ackOk = false;
this.awsRealTimeSocket.onerror = error => {
logger(`WebSocket closed ${JSON.stringify(error)}`);
};
this.awsRealTimeSocket.onclose = event => {
logger(`WebSocket closed ${event.reason}`);
rej(new Error(JSON.stringify(event)));
};
this.awsRealTimeSocket.onmessage = (message: MessageEvent) => {
logger(
`subscription message from AWS AppSyncRealTime: ${message.data} `
);
const data = JSON.parse(message.data);
const {
type,
payload: { connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT } = {}
} = data;
if (type === MESSAGE_TYPES.GQL_CONNECTION_ACK) {
ackOk = true;
this.keepAliveTimeout = connectionTimeoutMs;
this.awsRealTimeSocket.onmessage = this._handleIncomingSubscriptionMessage.bind(
this
);
this.awsRealTimeSocket.onerror = err => {
logger(err);
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
this.awsRealTimeSocket.onclose = event => {
logger(`WebSocket closed ${event.reason}`);
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
res("Cool, connected to AWS AppSyncRealTime");
return;
}
if (type === MESSAGE_TYPES.GQL_CONNECTION_ERROR) {
const {
payload: {
errors: [{ errorType = "", errorCode = 0 } = {}] = []
} = {}
} = data;
rej({ errorType, errorCode });
}
};
const gqlInit = {
type: MESSAGE_TYPES.GQL_CONNECTION_INIT
};
this.awsRealTimeSocket.send(JSON.stringify(gqlInit));
function checkAckOk() {
if (!ackOk) {
rej(
new Error(
`Connection timeout: ack from AWSRealTime was not received on ${CONNECTION_INIT_TIMEOUT} ms`
)
);
}
}
setTimeout(checkAckOk.bind(this), CONNECTION_INIT_TIMEOUT);
});
})();
} catch (err) {
const { errorType, errorCode } = err;
if (NON_RETRYABLE_CODES.indexOf(errorCode) >= 0) {
throw new NonRetryableError(errorType);
} else if (errorType) {
throw new Error(errorType);
} else {
throw err;
}
}
}