protected maybeAggregate()

in firestore-counter/functions/src/worker.ts [169:273]


  protected maybeAggregate() {
    if (this.aggregation != null || this.shards === null) return;

    this.rounds++;
    if (this.shards.length === SHARDS_LIMIT) this.roundsCapped++;

    // Identify partial shards that are candidates for cleanup.
    const [toAggregate, toCleanup] = ShardedCounterWorker.categorizeShards(
      this.shards,
      this.singleRun
    );

    const cleanupPromises = ShardedCounterWorker.cleanupPartials(
      this.db,
      toCleanup
    );

    const plans = Planner.planAggregations(
      this.metadata.slice.start,
      toAggregate
    );

    const promises = plans.map(async (plan) => {
      try {
        const paths = await this.db.runTransaction(async (t) => {
          const paths = [];

          // Read metadata document in transaction to guarantee ownership of the slice.
          const metadocPromise = t.get(this.metadoc.ref);

          const counterPromise = plan.isPartial
            ? Promise.resolve(null)
            : t.get(this.db.doc(plan.aggregate));

          // Read all shards in a transaction since we want to delete them immediately.
          // Note that partials are not read here, because we use array transform to
          // update them and don't need transaction guarantees.
          const shardRefs = plan.shards.map((snap) => snap.ref);
          const shardsPromise =
            shardRefs.length > 0
              ? t.getAll(shardRefs[0], ...shardRefs.slice(1))
              : Promise.resolve([]);
          let shards: firestore.DocumentSnapshot[];
          let counter: firestore.DocumentSnapshot;
          let metadoc: firestore.DocumentSnapshot;
          try {
            [shards, counter, metadoc] = await Promise.all([
              shardsPromise,
              counterPromise,
              metadocPromise,
            ]);
          } catch (err) {
            logger.log(
              "Unable to read shards during aggregation round, skipping...",
              err
            );
            return [];
          }

          // Check that we still own the slice.
          if (!metadoc.exists || !deepEqual(metadoc.data(), this.metadata)) {
            logger.log("Metadata has changed, bailing out...");
            return [];
          }

          // Calculate aggregated value and save to aggregate shard.
          const aggr = new Aggregator();
          const update = aggr.aggregate(counter, plan.partials, shards);
          t.set(this.db.doc(plan.aggregate), update, { merge: true });

          // Delete shards that have been aggregated.
          shards.forEach((snap) => {
            if (snap.exists) {
              paths.push(snap.ref.path);
              t.delete(snap.ref);
            }
          });

          // Decrement partials by the amount that have been aggregated.
          plan.partials.forEach((snap) => {
            if (snap.exists) {
              const decrement = aggr.subtractPartial(snap);
              t.set(snap.ref, decrement, { merge: true });
            }
          });
          return paths;
        });
        this.allPaths.push(...paths);
      } catch (err) {
        logger.log(
          "transaction to: " + plan.aggregate + " failed, skipping...",
          err
        );
      }
    });
    if (promises.length === 0 && cleanupPromises.length === 0) return;

    this.aggregation = Promise.all(promises.concat(cleanupPromises)).then(
      () => {
        // once this aggregation is done mark it as such
        this.aggregation = null;
        return;
      }
    );
  }