in sdk/webpubsub-socketio-extension/src/EIO/components/web-pubsub-transport.ts [63:108]
public override async send(packets: EioPacket[]): Promise<void> {
debug(
`send packets, packets.length = ${packets.length}, packets = ${JSON.stringify(packets)},\
_buffer.length=${this._buffer.length}, _buffer=${JSON.stringify(this._buffer)}`
);
this.writable = false;
this._buffer.push(...packets);
packets = [];
if (this._buffer.length > 0 && !this._opened) {
const firstPacket = this._buffer.shift();
if (firstPacket.type === "open") {
const payload = await this._encodeEioPacketAsync(firstPacket, false);
debug(`first packet is 'open' packet, payload = ${payload}`);
this.clientConnectionContext.onAcceptEioConnection(payload.substring(1));
this._opened = true;
} else {
const errorMessage = `First packet must be a valid packet whose type is 'open', got packet = ${firstPacket}.`;
debug(errorMessage);
this.clientConnectionContext.onRefuseEioConnection(errorMessage);
}
}
while (this._buffer.length > 0) {
let sentNumber = 0;
try {
sentNumber = this._getPacketNumberForNextSend(this._buffer);
} catch (error) {
debug(`send, internal error, inside _getPacketNumberForNextSend, error = ${error.message},\
_buffer=${JSON.stringify(this._buffer)}`);
sentNumber = this._buffer.length;
}
if (sentNumber <= 0) break;
const payloads = await this._encodeEioPayloadAsync(this._buffer.splice(0, sentNumber));
await this._webPubSubSend(payloads);
}
this.writable = true;
// Transport's event `drain` is binded to `flush` method in `Socket` class by its father socket.
debug(`send, emit drain`);
this.emit("drain");
debug(`send, finish, _buffer.length=${this._buffer.length}, _buffer=${JSON.stringify(this._buffer)}`);
}