public async aggregateContinuously()

in firestore-counter/functions/src/controller.ts [68:131]


  public async aggregateContinuously(
    slice: Slice,
    limit: number,
    timeoutMillis: number
  ) {
    return new Promise((resolve, reject) => {
      let aggrPromise: Promise<ControllerStatus> = null;
      let controllerData: ControllerData = EMPTY_CONTROLLER_DATA;
      let rounds = 0;
      let skippedRoundsDueToWorkers = 0;
      let shardsCount = 0;

      let unsubscribeControllerListener = this.controllerDocRef.onSnapshot(
        (snap) => {
          if (snap.exists) {
            controllerData = <ControllerData>snap.data();
          }
        }
      );

      let unsubscribeSliceListener = queryRange(
        this.db,
        this.shardCollectionId,
        slice.start,
        slice.end,
        limit
      ).onSnapshot(async (snap) => {
        if (snap.docs.length == limit) return;
        if (controllerData.workers.length > 0) {
          skippedRoundsDueToWorkers++;
          return;
        }
        if (aggrPromise === null) {
          aggrPromise = this.aggregateOnce(slice, limit);
          const status = await aggrPromise;
          aggrPromise = null;
          if (status === ControllerStatus.SUCCESS) {
            shardsCount += snap.docs.length;
            rounds++;
          }
        }
      });

      const shutdown = async () => {
        logger.log(
          "Successfully ran " +
            rounds +
            " rounds. Aggregated " +
            shardsCount +
            " shards."
        );
        logger.log(
          "Skipped " +
            skippedRoundsDueToWorkers +
            " rounds due to workers running."
        );
        unsubscribeControllerListener();
        unsubscribeSliceListener();
        if (aggrPromise === null) await aggrPromise;
        resolve();
      };
      setTimeout(shutdown, timeoutMillis);
    });
  }