public async broadcastWithAck()

in sdk/webpubsub-socketio-extension/src/SIO/components/web-pubsub-adapter.ts [207:273]


  public async broadcastWithAck(
    packet: SioPacket,
    opts: BroadcastOptions,
    clientCountCallback: (clientCount: number) => void,
    ack: (...args: unknown[]) => void
  ): Promise<void> {
    debug(`broadcastWithAck, start, packet = ${JSON.stringify(packet)},\
  opts = ${toOptionsString(opts)}, namespace = "${this.nsp.name}"`);

    let accumulatedData = "";
    let count = 0;

    const streamHandleResponse = (chunk: string) => {
      const handleJsonLines = (lines: string[], onPacket: (SioPacket) => void) => {
        /**
         * Line 1: {xxx}
         * Line 2: {xxx}
         * ..
         * Line N: {xx   // maybe not complete
         */
        for (let i = 0; i < lines.length - 1; i++) {
          if (lines[i]) {
            const emitWithAckResponse = JSON.parse(lines[i]);
            // The payload is UTF-8 encoded EIO payload, we need to decode it and only ack the data
            const eioPackets = EioParser.decodePayload(emitWithAckResponse.Payload);
            this._sioDecoder.on("decoded", (packet: SioPacket) => onPacket(packet));
            eioPackets.forEach((element) => {
              this._sioDecoder.add(element.data);
            });
            this._sioDecoder.off("decoded");
          }
        }
      };

      accumulatedData += chunk.toString();
      const lines = accumulatedData.split("\n");
      handleJsonLines(lines, (packet: SioPacket) => {
        ack(...packet.data);
        count++;
      });
      accumulatedData = lines[lines.length - 1];
    };

    const bodyHandler = (value: Uint8Array | undefined, end: boolean) => {
      if (value) {
        const text = this._utf8Decoder.decode(value);
        streamHandleResponse(text);
      }
      if (end) {
        clientCountCallback(count);
        return;
      }
    };

    try {
      packet.nsp = this.nsp.name;
      const encodedPayload = await getSingleEioEncodedPayload(packet);
      const oDataFilter = this._buildODataFilter(opts.rooms, opts.except);

      await this.service.invoke(encodedPayload, bodyHandler, { filter: oDataFilter, contentType: "text/plain" });

      debug(`broadcastWithAck, finish`);
    } catch (e) {
      debug(`broadcastWithAck, error, packet = ${JSON.stringify(packet)},\
opts = ${toOptionsString(opts)}, namespace = "${this.nsp.name}, error = ${e}"`);
    }
  }