in sdk/webpubsub-socketio-extension/src/SIO/components/web-pubsub-adapter.ts [207:273]
public async broadcastWithAck(
packet: SioPacket,
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: unknown[]) => void
): Promise<void> {
debug(`broadcastWithAck, start, packet = ${JSON.stringify(packet)},\
opts = ${toOptionsString(opts)}, namespace = "${this.nsp.name}"`);
let accumulatedData = "";
let count = 0;
const streamHandleResponse = (chunk: string) => {
const handleJsonLines = (lines: string[], onPacket: (SioPacket) => void) => {
/**
* Line 1: {xxx}
* Line 2: {xxx}
* ..
* Line N: {xx // maybe not complete
*/
for (let i = 0; i < lines.length - 1; i++) {
if (lines[i]) {
const emitWithAckResponse = JSON.parse(lines[i]);
// The payload is UTF-8 encoded EIO payload, we need to decode it and only ack the data
const eioPackets = EioParser.decodePayload(emitWithAckResponse.Payload);
this._sioDecoder.on("decoded", (packet: SioPacket) => onPacket(packet));
eioPackets.forEach((element) => {
this._sioDecoder.add(element.data);
});
this._sioDecoder.off("decoded");
}
}
};
accumulatedData += chunk.toString();
const lines = accumulatedData.split("\n");
handleJsonLines(lines, (packet: SioPacket) => {
ack(...packet.data);
count++;
});
accumulatedData = lines[lines.length - 1];
};
const bodyHandler = (value: Uint8Array | undefined, end: boolean) => {
if (value) {
const text = this._utf8Decoder.decode(value);
streamHandleResponse(text);
}
if (end) {
clientCountCallback(count);
return;
}
};
try {
packet.nsp = this.nsp.name;
const encodedPayload = await getSingleEioEncodedPayload(packet);
const oDataFilter = this._buildODataFilter(opts.rooms, opts.except);
await this.service.invoke(encodedPayload, bodyHandler, { filter: oDataFilter, contentType: "text/plain" });
debug(`broadcastWithAck, finish`);
} catch (e) {
debug(`broadcastWithAck, error, packet = ${JSON.stringify(packet)},\
opts = ${toOptionsString(opts)}, namespace = "${this.nsp.name}, error = ${e}"`);
}
}