maybeAggregate()

in firestore-counter/functions/lib/worker.js [134:212]


    maybeAggregate() {
        if (this.aggregation != null || this.shards === null)
            return;
        this.rounds++;
        if (this.shards.length === SHARDS_LIMIT)
            this.roundsCapped++;
        // Identify partial shards that are candidates for cleanup.
        const [toAggregate, toCleanup] = ShardedCounterWorker.categorizeShards(this.shards, this.singleRun);
        const cleanupPromises = ShardedCounterWorker.cleanupPartials(this.db, toCleanup);
        const plans = planner_1.Planner.planAggregations(this.metadata.slice.start, toAggregate);
        const promises = plans.map(async (plan) => {
            try {
                const paths = await this.db.runTransaction(async (t) => {
                    const paths = [];
                    // Read metadata document in transaction to guarantee ownership of the slice.
                    const metadocPromise = t.get(this.metadoc.ref);
                    const counterPromise = plan.isPartial
                        ? Promise.resolve(null)
                        : t.get(this.db.doc(plan.aggregate));
                    // Read all shards in a transaction since we want to delete them immediately.
                    // Note that partials are not read here, because we use array transform to
                    // update them and don't need transaction guarantees.
                    const shardRefs = plan.shards.map((snap) => snap.ref);
                    const shardsPromise = shardRefs.length > 0
                        ? t.getAll(shardRefs[0], ...shardRefs.slice(1))
                        : Promise.resolve([]);
                    let shards;
                    let counter;
                    let metadoc;
                    try {
                        [shards, counter, metadoc] = await Promise.all([
                            shardsPromise,
                            counterPromise,
                            metadocPromise,
                        ]);
                    }
                    catch (err) {
                        firebase_functions_1.logger.log("Unable to read shards during aggregation round, skipping...", err);
                        return [];
                    }
                    // Check that we still own the slice.
                    if (!metadoc.exists || !deepEqual(metadoc.data(), this.metadata)) {
                        firebase_functions_1.logger.log("Metadata has changed, bailing out...");
                        return [];
                    }
                    // Calculate aggregated value and save to aggregate shard.
                    const aggr = new aggregator_1.Aggregator();
                    const update = aggr.aggregate(counter, plan.partials, shards);
                    t.set(this.db.doc(plan.aggregate), update, { merge: true });
                    // Delete shards that have been aggregated.
                    shards.forEach((snap) => {
                        if (snap.exists) {
                            paths.push(snap.ref.path);
                            t.delete(snap.ref);
                        }
                    });
                    // Decrement partials by the amount that have been aggregated.
                    plan.partials.forEach((snap) => {
                        if (snap.exists) {
                            const decrement = aggr.subtractPartial(snap);
                            t.set(snap.ref, decrement, { merge: true });
                        }
                    });
                    return paths;
                });
                this.allPaths.push(...paths);
            }
            catch (err) {
                firebase_functions_1.logger.log("transaction to: " + plan.aggregate + " failed, skipping...", err);
            }
        });
        if (promises.length === 0 && cleanupPromises.length === 0)
            return;
        this.aggregation = Promise.all(promises.concat(cleanupPromises)).then(() => {
            // once this aggregation is done mark it as such
            this.aggregation = null;
            return;
        });
    }