async rescheduleWorkers()

in firestore-counter/functions/lib/controller.js [173:254]


    async rescheduleWorkers() {
        const timestamp = Date.now();
        await this.db.runTransaction(async (t) => {
            // Read controller document to prevent race conditions.
            try {
                await t.get(this.controllerDocRef);
            }
            catch (err) {
                firebase_functions_1.logger.log("Failed to read controller doc " + this.controllerDocRef.path);
                throw Error("Failed to read controller doc.");
            }
            // Read all workers' metadata and construct sharding info based on collected stats.
            let query = null;
            try {
                query = await t.get(this.workersRef.orderBy(firebase_admin_1.firestore.FieldPath.documentId()));
            }
            catch (err) {
                firebase_functions_1.logger.log("Failed to read worker docs.", err);
                throw Error("Failed to read worker docs.");
            }
            let shardingInfo = await Promise.all(query.docs.map(async (worker) => {
                const slice = worker.get("slice");
                const stats = worker.get("stats");
                // This workers hasn't had a chance to finish its run yet. Bail out.
                if (!stats) {
                    return {
                        slice: slice,
                        hasData: false,
                        overloaded: false,
                        splits: [],
                    };
                }
                const hasData = true;
                const overloaded = stats.rounds === stats.roundsCapped;
                const splits = stats.splits;
                // If a worker is overloaded, we don't have reliable splits for that range.
                // Fetch extra shards to make better balancing decision.
                try {
                    if (overloaded && splits.length > 0) {
                        const snap = await common_1.queryRange(this.db, this.shardCollectionId, splits[splits.length - 1], slice.end, 100000).get();
                        for (let i = 100; i < snap.docs.length; i += 100) {
                            splits.push(snap.docs[i].ref.path);
                        }
                    }
                }
                catch (err) {
                    firebase_functions_1.logger.log("Failed to calculate additional splits for worker: " + worker.id);
                }
                return { slice, hasData, overloaded, splits };
            }));
            let [reshard, slices] = ShardedCounterController.balanceWorkers(shardingInfo);
            if (reshard) {
                firebase_functions_1.logger.log("Resharding workers, new workers: " +
                    slices.length +
                    " prev num workers: " +
                    query.docs.length);
                query.docs.forEach((snap) => t.delete(snap.ref));
                slices.forEach((slice, index) => {
                    t.set(this.workersRef.doc(ShardedCounterController.encodeWorkerKey(index)), {
                        slice: slice,
                        timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp(),
                    });
                });
                t.set(this.controllerDocRef, {
                    workers: slices,
                    timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp(),
                });
            }
            else {
                // Check workers that haven't updated stats for over 90s - they most likely failed.
                let failures = 0;
                query.docs.forEach((snap) => {
                    if (timestamp / 1000 - snap.updateTime.seconds > 90) {
                        t.set(snap.ref, { timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp() }, { merge: true });
                        failures++;
                    }
                });
                firebase_functions_1.logger.log("Detected " + failures + " failed workers.");
                t.set(this.controllerDocRef, { timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp() }, { merge: true });
            }
        });
    }