in sdk/web-pubsub/web-pubsub-client/src/webPubSubClient.ts [563:770]
private _connectCore(uri: string): Promise<void> {
if (this._isStopping) {
throw new Error("Can't start a client during stopping");
}
return new Promise<void>((resolve, reject) => {
// This part is executed sync
const client = (this._wsClient = this._getWebSocketClientFactory().create(
uri,
this._protocol.name,
));
client.onopen(() => {
// There's a case that client called stop() before this method. We need to check and close it if it's the case.
if (this._isStopping) {
try {
client.close();
} catch {
/** empty */
}
reject(new Error(`The client is stopped`));
}
logger.verbose("WebSocket connection has opened");
this._changeState(WebPubSubClientState.Connected);
if (this._protocol.isReliableSubProtocol) {
if (this._sequenceAckTask != null) {
this._sequenceAckTask.abort();
}
this._sequenceAckTask = new AbortableTask(async () => {
await this._trySendSequenceAck();
}, 1000);
}
resolve();
});
client.onerror((e) => {
if (this._sequenceAckTask != null) {
this._sequenceAckTask.abort();
}
reject(e);
});
client.onclose((code: number, reason: string) => {
if (this._state === WebPubSubClientState.Connected) {
logger.verbose("WebSocket closed after open");
if (this._sequenceAckTask != null) {
this._sequenceAckTask.abort();
}
logger.info(`WebSocket connection closed. Code: ${code}, Reason: ${reason}`);
this._lastCloseEvent = { code: code, reason: reason };
this._handleConnectionClose.call(this);
} else {
logger.verbose("WebSocket closed before open");
reject(new Error(`Failed to start WebSocket: ${code}`));
}
});
client.onmessage((data: any) => {
const handleAckMessage = (message: AckMessage): void => {
if (this._ackMap.has(message.ackId)) {
const entity = this._ackMap.get(message.ackId)!;
this._ackMap.delete(message.ackId);
const isDuplicated: boolean =
message.error != null && message.error.name === "Duplicate";
if (message.success || isDuplicated) {
entity.resolve({
ackId: message.ackId,
isDuplicated: isDuplicated,
} as WebPubSubResult);
} else {
entity.reject(
new SendMessageError("Failed to send message.", {
ackId: message.ackId,
errorDetail: message.error,
} as SendMessageErrorOptions),
);
}
}
};
const handleConnectedMessage = async (message: ConnectedMessage): Promise<void> => {
this._connectionId = message.connectionId;
this._reconnectionToken = message.reconnectionToken;
if (!this._isInitialConnected) {
this._isInitialConnected = true;
if (this._options.autoRejoinGroups) {
const groupPromises: Promise<void>[] = [];
this._groupMap.forEach((g) => {
if (g.isJoined) {
groupPromises.push(
(async () => {
try {
await this._joinGroupCore(g.name);
} catch (err) {
this._safeEmitRejoinGroupFailed(g.name, err);
}
})(),
);
}
});
await Promise.all(groupPromises).catch(() => {
/** empty */
});
}
this._safeEmitConnected(message.connectionId, message.userId);
}
};
const handleDisconnectedMessage = (message: DisconnectedMessage): void => {
this._lastDisconnectedMessage = message;
};
const handleGroupDataMessage = (message: GroupDataMessage): void => {
if (message.sequenceId != null) {
const diff = this._sequenceId.tryUpdate(message.sequenceId);
if (diff === 0) {
// drop duplicated message
return;
}
// If the diff is larger than the threshold, we must ack quicker to avoid slow client drop.
if (diff > this._quickSequenceAckDiff) {
this._trySendSequenceAck();
}
}
this._safeEmitGroupMessage(message);
};
const handleServerDataMessage = (message: ServerDataMessage): void => {
if (message.sequenceId != null) {
const diff = this._sequenceId.tryUpdate(message.sequenceId);
if (diff === 0) {
// drop duplicated message
return;
}
// If the diff is larger than the threshold, we must ack quicker to avoid slow client drop.
if (diff > this._quickSequenceAckDiff) {
this._trySendSequenceAck();
}
}
this._safeEmitServerMessage(message);
};
let messages: WebPubSubMessage[] | WebPubSubMessage | null;
try {
let convertedData: Buffer | ArrayBuffer | string;
if (Array.isArray(data)) {
convertedData = Buffer.concat(data);
} else {
convertedData = data;
}
messages = this._protocol.parseMessages(convertedData);
if (messages === null) {
// null means the message is not recognized.
return;
}
} catch (err) {
logger.warning("An error occurred while parsing the message from service", err);
throw err;
}
if (!Array.isArray(messages)) {
messages = [messages];
}
messages.forEach((message) => {
try {
switch (message.kind) {
case "ack": {
handleAckMessage(message as AckMessage);
break;
}
case "connected": {
handleConnectedMessage(message as ConnectedMessage);
break;
}
case "disconnected": {
handleDisconnectedMessage(message as DisconnectedMessage);
break;
}
case "groupData": {
handleGroupDataMessage(message as GroupDataMessage);
break;
}
case "serverData": {
handleServerDataMessage(message as ServerDataMessage);
break;
}
}
} catch (err) {
logger.warning(
`An error occurred while handling the message with kind: ${message.kind} from service`,
err,
);
}
});
});
});
}