in firestore-counter/functions/src/controller.ts [222:333]
public async rescheduleWorkers() {
const timestamp = Date.now();
await this.db.runTransaction(async (t) => {
// Read controller document to prevent race conditions.
try {
await t.get(this.controllerDocRef);
} catch (err) {
logger.log(
"Failed to read controller doc " + this.controllerDocRef.path
);
throw Error("Failed to read controller doc.");
}
// Read all workers' metadata and construct sharding info based on collected stats.
let query: firestore.QuerySnapshot = null;
try {
query = await t.get(
this.workersRef.orderBy(firestore.FieldPath.documentId())
);
} catch (err) {
logger.log("Failed to read worker docs.", err);
throw Error("Failed to read worker docs.");
}
let shardingInfo: WorkerShardingInfo[] = await Promise.all(
query.docs.map(async (worker) => {
const slice: Slice = worker.get("slice");
const stats: WorkerStats = worker.get("stats");
// This workers hasn't had a chance to finish its run yet. Bail out.
if (!stats) {
return {
slice: slice,
hasData: false,
overloaded: false,
splits: [],
};
}
const hasData = true;
const overloaded = stats.rounds === stats.roundsCapped;
const splits = stats.splits;
// If a worker is overloaded, we don't have reliable splits for that range.
// Fetch extra shards to make better balancing decision.
try {
if (overloaded && splits.length > 0) {
const snap = await queryRange(
this.db,
this.shardCollectionId,
splits[splits.length - 1],
slice.end,
100000
).get();
for (let i = 100; i < snap.docs.length; i += 100) {
splits.push(snap.docs[i].ref.path);
}
}
} catch (err) {
logger.log(
"Failed to calculate additional splits for worker: " + worker.id
);
}
return { slice, hasData, overloaded, splits };
})
);
let [reshard, slices] = ShardedCounterController.balanceWorkers(
shardingInfo
);
if (reshard) {
logger.log(
"Resharding workers, new workers: " +
slices.length +
" prev num workers: " +
query.docs.length
);
query.docs.forEach((snap) => t.delete(snap.ref));
slices.forEach((slice, index) => {
t.set(
this.workersRef.doc(
ShardedCounterController.encodeWorkerKey(index)
),
{
slice: slice,
timestamp: firestore.FieldValue.serverTimestamp(),
}
);
});
t.set(this.controllerDocRef, {
workers: slices,
timestamp: firestore.FieldValue.serverTimestamp(),
});
} else {
// Check workers that haven't updated stats for over 90s - they most likely failed.
let failures = 0;
query.docs.forEach((snap) => {
if (timestamp / 1000 - snap.updateTime.seconds > 90) {
t.set(
snap.ref,
{ timestamp: firestore.FieldValue.serverTimestamp() },
{ merge: true }
);
failures++;
}
});
logger.log("Detected " + failures + " failed workers.");
t.set(
this.controllerDocRef,
{ timestamp: firestore.FieldValue.serverTimestamp() },
{ merge: true }
);
}
});
}