in firestore-counter/functions/lib/worker.js [134:212]
maybeAggregate() {
if (this.aggregation != null || this.shards === null)
return;
this.rounds++;
if (this.shards.length === SHARDS_LIMIT)
this.roundsCapped++;
// Identify partial shards that are candidates for cleanup.
const [toAggregate, toCleanup] = ShardedCounterWorker.categorizeShards(this.shards, this.singleRun);
const cleanupPromises = ShardedCounterWorker.cleanupPartials(this.db, toCleanup);
const plans = planner_1.Planner.planAggregations(this.metadata.slice.start, toAggregate);
const promises = plans.map(async (plan) => {
try {
const paths = await this.db.runTransaction(async (t) => {
const paths = [];
// Read metadata document in transaction to guarantee ownership of the slice.
const metadocPromise = t.get(this.metadoc.ref);
const counterPromise = plan.isPartial
? Promise.resolve(null)
: t.get(this.db.doc(plan.aggregate));
// Read all shards in a transaction since we want to delete them immediately.
// Note that partials are not read here, because we use array transform to
// update them and don't need transaction guarantees.
const shardRefs = plan.shards.map((snap) => snap.ref);
const shardsPromise = shardRefs.length > 0
? t.getAll(shardRefs[0], ...shardRefs.slice(1))
: Promise.resolve([]);
let shards;
let counter;
let metadoc;
try {
[shards, counter, metadoc] = await Promise.all([
shardsPromise,
counterPromise,
metadocPromise,
]);
}
catch (err) {
firebase_functions_1.logger.log("Unable to read shards during aggregation round, skipping...", err);
return [];
}
// Check that we still own the slice.
if (!metadoc.exists || !deepEqual(metadoc.data(), this.metadata)) {
firebase_functions_1.logger.log("Metadata has changed, bailing out...");
return [];
}
// Calculate aggregated value and save to aggregate shard.
const aggr = new aggregator_1.Aggregator();
const update = aggr.aggregate(counter, plan.partials, shards);
t.set(this.db.doc(plan.aggregate), update, { merge: true });
// Delete shards that have been aggregated.
shards.forEach((snap) => {
if (snap.exists) {
paths.push(snap.ref.path);
t.delete(snap.ref);
}
});
// Decrement partials by the amount that have been aggregated.
plan.partials.forEach((snap) => {
if (snap.exists) {
const decrement = aggr.subtractPartial(snap);
t.set(snap.ref, decrement, { merge: true });
}
});
return paths;
});
this.allPaths.push(...paths);
}
catch (err) {
firebase_functions_1.logger.log("transaction to: " + plan.aggregate + " failed, skipping...", err);
}
});
if (promises.length === 0 && cleanupPromises.length === 0)
return;
this.aggregation = Promise.all(promises.concat(cleanupPromises)).then(() => {
// once this aggregation is done mark it as such
this.aggregation = null;
return;
});
}