in firestore-counter/functions/src/controller.ts [68:131]
public async aggregateContinuously(
slice: Slice,
limit: number,
timeoutMillis: number
) {
return new Promise((resolve, reject) => {
let aggrPromise: Promise<ControllerStatus> = null;
let controllerData: ControllerData = EMPTY_CONTROLLER_DATA;
let rounds = 0;
let skippedRoundsDueToWorkers = 0;
let shardsCount = 0;
let unsubscribeControllerListener = this.controllerDocRef.onSnapshot(
(snap) => {
if (snap.exists) {
controllerData = <ControllerData>snap.data();
}
}
);
let unsubscribeSliceListener = queryRange(
this.db,
this.shardCollectionId,
slice.start,
slice.end,
limit
).onSnapshot(async (snap) => {
if (snap.docs.length == limit) return;
if (controllerData.workers.length > 0) {
skippedRoundsDueToWorkers++;
return;
}
if (aggrPromise === null) {
aggrPromise = this.aggregateOnce(slice, limit);
const status = await aggrPromise;
aggrPromise = null;
if (status === ControllerStatus.SUCCESS) {
shardsCount += snap.docs.length;
rounds++;
}
}
});
const shutdown = async () => {
logger.log(
"Successfully ran " +
rounds +
" rounds. Aggregated " +
shardsCount +
" shards."
);
logger.log(
"Skipped " +
skippedRoundsDueToWorkers +
" rounds due to workers running."
);
unsubscribeControllerListener();
unsubscribeSliceListener();
if (aggrPromise === null) await aggrPromise;
resolve();
};
setTimeout(shutdown, timeoutMillis);
});
}