in firestore-counter/functions/src/worker.ts [169:273]
protected maybeAggregate() {
if (this.aggregation != null || this.shards === null) return;
this.rounds++;
if (this.shards.length === SHARDS_LIMIT) this.roundsCapped++;
// Identify partial shards that are candidates for cleanup.
const [toAggregate, toCleanup] = ShardedCounterWorker.categorizeShards(
this.shards,
this.singleRun
);
const cleanupPromises = ShardedCounterWorker.cleanupPartials(
this.db,
toCleanup
);
const plans = Planner.planAggregations(
this.metadata.slice.start,
toAggregate
);
const promises = plans.map(async (plan) => {
try {
const paths = await this.db.runTransaction(async (t) => {
const paths = [];
// Read metadata document in transaction to guarantee ownership of the slice.
const metadocPromise = t.get(this.metadoc.ref);
const counterPromise = plan.isPartial
? Promise.resolve(null)
: t.get(this.db.doc(plan.aggregate));
// Read all shards in a transaction since we want to delete them immediately.
// Note that partials are not read here, because we use array transform to
// update them and don't need transaction guarantees.
const shardRefs = plan.shards.map((snap) => snap.ref);
const shardsPromise =
shardRefs.length > 0
? t.getAll(shardRefs[0], ...shardRefs.slice(1))
: Promise.resolve([]);
let shards: firestore.DocumentSnapshot[];
let counter: firestore.DocumentSnapshot;
let metadoc: firestore.DocumentSnapshot;
try {
[shards, counter, metadoc] = await Promise.all([
shardsPromise,
counterPromise,
metadocPromise,
]);
} catch (err) {
logger.log(
"Unable to read shards during aggregation round, skipping...",
err
);
return [];
}
// Check that we still own the slice.
if (!metadoc.exists || !deepEqual(metadoc.data(), this.metadata)) {
logger.log("Metadata has changed, bailing out...");
return [];
}
// Calculate aggregated value and save to aggregate shard.
const aggr = new Aggregator();
const update = aggr.aggregate(counter, plan.partials, shards);
t.set(this.db.doc(plan.aggregate), update, { merge: true });
// Delete shards that have been aggregated.
shards.forEach((snap) => {
if (snap.exists) {
paths.push(snap.ref.path);
t.delete(snap.ref);
}
});
// Decrement partials by the amount that have been aggregated.
plan.partials.forEach((snap) => {
if (snap.exists) {
const decrement = aggr.subtractPartial(snap);
t.set(snap.ref, decrement, { merge: true });
}
});
return paths;
});
this.allPaths.push(...paths);
} catch (err) {
logger.log(
"transaction to: " + plan.aggregate + " failed, skipping...",
err
);
}
});
if (promises.length === 0 && cleanupPromises.length === 0) return;
this.aggregation = Promise.all(promises.concat(cleanupPromises)).then(
() => {
// once this aggregation is done mark it as such
this.aggregation = null;
return;
}
);
}