public run()

in firestore-counter/functions/src/worker.ts [75:167]


  public run(): Promise<void> {
    return new Promise((resolve, reject) => {
      let intervalTimer: any;
      let timeoutTimer: any;
      let unsubscribeMetadataListener: (() => void);
      let unsubscribeSliceListener: (() => void);

      const shutdown = async () => {
        clearInterval(intervalTimer);
        clearTimeout(timeoutTimer);
        unsubscribeMetadataListener();
        unsubscribeSliceListener();
        if (this.aggregation != null) {
          try {
            await this.aggregation;
          } catch (err) {
            // Not much here we can do, transaction is over.
          }
        }
      };

      const writeStats = async () => {
        this.allPaths.sort();
        let splits = this.allPaths.filter(
          (val, idx) => idx !== 0 && idx % 100 === 0
        );
        let stats: WorkerStats = {
          shardsAggregated: this.allPaths.length,
          splits: splits,
          lastSuccessfulRun: Date.now(),
          rounds: this.rounds,
          roundsCapped: this.roundsCapped,
        };
        try {
          await this.db.runTransaction(async (t) => {
            try {
              const snap = await t.get(this.metadoc.ref);
              if (snap.exists && deepEqual(snap.data(), this.metadata)) {
                t.update(snap.ref, {
                  timestamp: firestore.FieldValue.serverTimestamp(),
                  stats: stats,
                });
              }
            } catch (err) {
              logger.log("Failed to save writer stats.", err);
            }
          });
        } catch (err) {
          logger.log("Failed to save writer stats.", err);
        }
      };

      intervalTimer = setInterval(() => {
        this.maybeAggregate();
      }, 1000);

      timeoutTimer = setTimeout(
        () =>
          shutdown()
            .then(writeStats)
            .then(resolve)
            .catch(reject),
        WORKER_TIMEOUT_MS
      );

      unsubscribeMetadataListener = this.metadoc.ref.onSnapshot((snap) => {
        // if something's changed in the worker metadata since we were called, abort.
        if (!snap.exists || !deepEqual(snap.data(), this.metadata)) {
          logger.log("Shutting down because metadoc changed.");
          shutdown()
            .then(resolve)
            .catch(reject);
        }
      });

      unsubscribeSliceListener = queryRange(
        this.db,
        this.shardCollection,
        this.metadata.slice.start,
        this.metadata.slice.end,
        SHARDS_LIMIT
      ).onSnapshot((snap) => {
        this.shards = snap.docs;
        if (this.singleRun && this.shards.length === 0) {
          logger.log("Shutting down, single run mode.");
          shutdown()
            .then(writeStats)
            .then(resolve)
            .catch(reject);
        }
      });
    });
  }