in firestore-counter/functions/src/controller.ts [137:217]
public async aggregateOnce(
slice: Slice,
limit: number
): Promise<ControllerStatus> {
try {
const status = await this.db.runTransaction(async (t) => {
let controllerDoc = null;
try {
controllerDoc = await t.get(this.controllerDocRef);
} catch (err) {
logger.log(
"Failed to read controller doc: " + this.controllerDocRef.path
);
throw Error("Failed to read controller doc.");
}
const controllerData: ControllerData = controllerDoc.exists
? controllerDoc.data()
: EMPTY_CONTROLLER_DATA;
if (controllerData.workers.length > 0)
return ControllerStatus.WORKERS_RUNNING;
let shards: firestore.QuerySnapshot = null;
try {
shards = await t.get(
queryRange(
this.db,
this.shardCollectionId,
slice.start,
slice.end,
limit
)
);
} catch (err) {
logger.log("Query to find shards to aggregate failed.", err);
throw Error("Query to find shards to aggregate failed.");
}
if (shards.docs.length == 200) return ControllerStatus.TOO_MANY_SHARDS;
const plans = Planner.planAggregations("", shards.docs);
const promises = plans.map(async (plan) => {
if (plan.isPartial) {
throw Error(
"Aggregation plan in controller run resulted in partial shard, " +
"this should never happen!"
);
}
let counter: firestore.DocumentSnapshot = null;
try {
counter = await t.get(this.db.doc(plan.aggregate));
} catch (err) {
logger.log("Failed to read document: " + plan.aggregate, err);
throw Error("Failed to read counter " + plan.aggregate);
}
// Calculate aggregated value and save to aggregate shard.
const aggr = new Aggregator();
const update = aggr.aggregate(counter, plan.partials, plan.shards);
t.set(this.db.doc(plan.aggregate), update, { merge: true });
// Delete shards that have been aggregated.
plan.shards.forEach((snap) => t.delete(snap.ref));
plan.partials.forEach((snap) => t.delete(snap.ref));
});
try {
await Promise.all(promises);
} catch (err) {
logger.log("Some counter aggregation failed, bailing out.");
throw Error("Some counter aggregation failed, bailing out.");
}
t.set(
this.controllerDocRef,
{ timestamp: firestore.FieldValue.serverTimestamp() },
{ merge: true }
);
logger.log("Aggregated " + plans.length + " counters.");
return ControllerStatus.SUCCESS;
});
return status;
} catch (err) {
logger.log("Transaction to aggregate shards failed.", err);
return ControllerStatus.FAILURE;
}
}