in firestore-counter/functions/lib/controller.js [104:169]
async aggregateOnce(slice, limit) {
try {
const status = await this.db.runTransaction(async (t) => {
let controllerDoc = null;
try {
controllerDoc = await t.get(this.controllerDocRef);
}
catch (err) {
firebase_functions_1.logger.log("Failed to read controller doc: " + this.controllerDocRef.path);
throw Error("Failed to read controller doc.");
}
const controllerData = controllerDoc.exists
? controllerDoc.data()
: EMPTY_CONTROLLER_DATA;
if (controllerData.workers.length > 0)
return ControllerStatus.WORKERS_RUNNING;
let shards = null;
try {
shards = await t.get(common_1.queryRange(this.db, this.shardCollectionId, slice.start, slice.end, limit));
}
catch (err) {
firebase_functions_1.logger.log("Query to find shards to aggregate failed.", err);
throw Error("Query to find shards to aggregate failed.");
}
if (shards.docs.length == 200)
return ControllerStatus.TOO_MANY_SHARDS;
const plans = planner_1.Planner.planAggregations("", shards.docs);
const promises = plans.map(async (plan) => {
if (plan.isPartial) {
throw Error("Aggregation plan in controller run resulted in partial shard, " +
"this should never happen!");
}
let counter = null;
try {
counter = await t.get(this.db.doc(plan.aggregate));
}
catch (err) {
firebase_functions_1.logger.log("Failed to read document: " + plan.aggregate, err);
throw Error("Failed to read counter " + plan.aggregate);
}
// Calculate aggregated value and save to aggregate shard.
const aggr = new aggregator_1.Aggregator();
const update = aggr.aggregate(counter, plan.partials, plan.shards);
t.set(this.db.doc(plan.aggregate), update, { merge: true });
// Delete shards that have been aggregated.
plan.shards.forEach((snap) => t.delete(snap.ref));
plan.partials.forEach((snap) => t.delete(snap.ref));
});
try {
await Promise.all(promises);
}
catch (err) {
firebase_functions_1.logger.log("Some counter aggregation failed, bailing out.");
throw Error("Some counter aggregation failed, bailing out.");
}
t.set(this.controllerDocRef, { timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp() }, { merge: true });
firebase_functions_1.logger.log("Aggregated " + plans.length + " counters.");
return ControllerStatus.SUCCESS;
});
return status;
}
catch (err) {
firebase_functions_1.logger.log("Transaction to aggregate shards failed.", err);
return ControllerStatus.FAILURE;
}
}