public async aggregateOnce()

in firestore-counter/functions/src/controller.ts [137:217]


  public async aggregateOnce(
    slice: Slice,
    limit: number
  ): Promise<ControllerStatus> {
    try {
      const status = await this.db.runTransaction(async (t) => {
        let controllerDoc = null;
        try {
          controllerDoc = await t.get(this.controllerDocRef);
        } catch (err) {
          logger.log(
            "Failed to read controller doc: " + this.controllerDocRef.path
          );
          throw Error("Failed to read controller doc.");
        }
        const controllerData: ControllerData = controllerDoc.exists
          ? controllerDoc.data()
          : EMPTY_CONTROLLER_DATA;
        if (controllerData.workers.length > 0)
          return ControllerStatus.WORKERS_RUNNING;

        let shards: firestore.QuerySnapshot = null;
        try {
          shards = await t.get(
            queryRange(
              this.db,
              this.shardCollectionId,
              slice.start,
              slice.end,
              limit
            )
          );
        } catch (err) {
          logger.log("Query to find shards to aggregate failed.", err);
          throw Error("Query to find shards to aggregate failed.");
        }
        if (shards.docs.length == 200) return ControllerStatus.TOO_MANY_SHARDS;

        const plans = Planner.planAggregations("", shards.docs);
        const promises = plans.map(async (plan) => {
          if (plan.isPartial) {
            throw Error(
              "Aggregation plan in controller run resulted in partial shard, " +
                "this should never happen!"
            );
          }
          let counter: firestore.DocumentSnapshot = null;
          try {
            counter = await t.get(this.db.doc(plan.aggregate));
          } catch (err) {
            logger.log("Failed to read document: " + plan.aggregate, err);
            throw Error("Failed to read counter " + plan.aggregate);
          }
          // Calculate aggregated value and save to aggregate shard.
          const aggr = new Aggregator();
          const update = aggr.aggregate(counter, plan.partials, plan.shards);
          t.set(this.db.doc(plan.aggregate), update, { merge: true });
          // Delete shards that have been aggregated.
          plan.shards.forEach((snap) => t.delete(snap.ref));
          plan.partials.forEach((snap) => t.delete(snap.ref));
        });
        try {
          await Promise.all(promises);
        } catch (err) {
          logger.log("Some counter aggregation failed, bailing out.");
          throw Error("Some counter aggregation failed, bailing out.");
        }
        t.set(
          this.controllerDocRef,
          { timestamp: firestore.FieldValue.serverTimestamp() },
          { merge: true }
        );
        logger.log("Aggregated " + plans.length + " counters.");
        return ControllerStatus.SUCCESS;
      });
      return status;
    } catch (err) {
      logger.log("Transaction to aggregate shards failed.", err);
      return ControllerStatus.FAILURE;
    }
  }