async aggregateOnce()

in firestore-counter/functions/lib/controller.js [104:169]


    async aggregateOnce(slice, limit) {
        try {
            const status = await this.db.runTransaction(async (t) => {
                let controllerDoc = null;
                try {
                    controllerDoc = await t.get(this.controllerDocRef);
                }
                catch (err) {
                    firebase_functions_1.logger.log("Failed to read controller doc: " + this.controllerDocRef.path);
                    throw Error("Failed to read controller doc.");
                }
                const controllerData = controllerDoc.exists
                    ? controllerDoc.data()
                    : EMPTY_CONTROLLER_DATA;
                if (controllerData.workers.length > 0)
                    return ControllerStatus.WORKERS_RUNNING;
                let shards = null;
                try {
                    shards = await t.get(common_1.queryRange(this.db, this.shardCollectionId, slice.start, slice.end, limit));
                }
                catch (err) {
                    firebase_functions_1.logger.log("Query to find shards to aggregate failed.", err);
                    throw Error("Query to find shards to aggregate failed.");
                }
                if (shards.docs.length == 200)
                    return ControllerStatus.TOO_MANY_SHARDS;
                const plans = planner_1.Planner.planAggregations("", shards.docs);
                const promises = plans.map(async (plan) => {
                    if (plan.isPartial) {
                        throw Error("Aggregation plan in controller run resulted in partial shard, " +
                            "this should never happen!");
                    }
                    let counter = null;
                    try {
                        counter = await t.get(this.db.doc(plan.aggregate));
                    }
                    catch (err) {
                        firebase_functions_1.logger.log("Failed to read document: " + plan.aggregate, err);
                        throw Error("Failed to read counter " + plan.aggregate);
                    }
                    // Calculate aggregated value and save to aggregate shard.
                    const aggr = new aggregator_1.Aggregator();
                    const update = aggr.aggregate(counter, plan.partials, plan.shards);
                    t.set(this.db.doc(plan.aggregate), update, { merge: true });
                    // Delete shards that have been aggregated.
                    plan.shards.forEach((snap) => t.delete(snap.ref));
                    plan.partials.forEach((snap) => t.delete(snap.ref));
                });
                try {
                    await Promise.all(promises);
                }
                catch (err) {
                    firebase_functions_1.logger.log("Some counter aggregation failed, bailing out.");
                    throw Error("Some counter aggregation failed, bailing out.");
                }
                t.set(this.controllerDocRef, { timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp() }, { merge: true });
                firebase_functions_1.logger.log("Aggregated " + plans.length + " counters.");
                return ControllerStatus.SUCCESS;
            });
            return status;
        }
        catch (err) {
            firebase_functions_1.logger.log("Transaction to aggregate shards failed.", err);
            return ControllerStatus.FAILURE;
        }
    }