in firestore-counter/functions/lib/controller.js [173:254]
async rescheduleWorkers() {
const timestamp = Date.now();
await this.db.runTransaction(async (t) => {
// Read controller document to prevent race conditions.
try {
await t.get(this.controllerDocRef);
}
catch (err) {
firebase_functions_1.logger.log("Failed to read controller doc " + this.controllerDocRef.path);
throw Error("Failed to read controller doc.");
}
// Read all workers' metadata and construct sharding info based on collected stats.
let query = null;
try {
query = await t.get(this.workersRef.orderBy(firebase_admin_1.firestore.FieldPath.documentId()));
}
catch (err) {
firebase_functions_1.logger.log("Failed to read worker docs.", err);
throw Error("Failed to read worker docs.");
}
let shardingInfo = await Promise.all(query.docs.map(async (worker) => {
const slice = worker.get("slice");
const stats = worker.get("stats");
// This workers hasn't had a chance to finish its run yet. Bail out.
if (!stats) {
return {
slice: slice,
hasData: false,
overloaded: false,
splits: [],
};
}
const hasData = true;
const overloaded = stats.rounds === stats.roundsCapped;
const splits = stats.splits;
// If a worker is overloaded, we don't have reliable splits for that range.
// Fetch extra shards to make better balancing decision.
try {
if (overloaded && splits.length > 0) {
const snap = await common_1.queryRange(this.db, this.shardCollectionId, splits[splits.length - 1], slice.end, 100000).get();
for (let i = 100; i < snap.docs.length; i += 100) {
splits.push(snap.docs[i].ref.path);
}
}
}
catch (err) {
firebase_functions_1.logger.log("Failed to calculate additional splits for worker: " + worker.id);
}
return { slice, hasData, overloaded, splits };
}));
let [reshard, slices] = ShardedCounterController.balanceWorkers(shardingInfo);
if (reshard) {
firebase_functions_1.logger.log("Resharding workers, new workers: " +
slices.length +
" prev num workers: " +
query.docs.length);
query.docs.forEach((snap) => t.delete(snap.ref));
slices.forEach((slice, index) => {
t.set(this.workersRef.doc(ShardedCounterController.encodeWorkerKey(index)), {
slice: slice,
timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp(),
});
});
t.set(this.controllerDocRef, {
workers: slices,
timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp(),
});
}
else {
// Check workers that haven't updated stats for over 90s - they most likely failed.
let failures = 0;
query.docs.forEach((snap) => {
if (timestamp / 1000 - snap.updateTime.seconds > 90) {
t.set(snap.ref, { timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp() }, { merge: true });
failures++;
}
});
firebase_functions_1.logger.log("Detected " + failures + " failed workers.");
t.set(this.controllerDocRef, { timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp() }, { merge: true });
}
});
}