public async rescheduleWorkers()

in firestore-counter/functions/src/controller.ts [222:333]


  public async rescheduleWorkers() {
    const timestamp = Date.now();

    await this.db.runTransaction(async (t) => {
      // Read controller document to prevent race conditions.
      try {
        await t.get(this.controllerDocRef);
      } catch (err) {
        logger.log(
          "Failed to read controller doc " + this.controllerDocRef.path
        );
        throw Error("Failed to read controller doc.");
      }
      // Read all workers' metadata and construct sharding info based on collected stats.
      let query: firestore.QuerySnapshot = null;
      try {
        query = await t.get(
          this.workersRef.orderBy(firestore.FieldPath.documentId())
        );
      } catch (err) {
        logger.log("Failed to read worker docs.", err);
        throw Error("Failed to read worker docs.");
      }
      let shardingInfo: WorkerShardingInfo[] = await Promise.all(
        query.docs.map(async (worker) => {
          const slice: Slice = worker.get("slice");
          const stats: WorkerStats = worker.get("stats");
          // This workers hasn't had a chance to finish its run yet. Bail out.
          if (!stats) {
            return {
              slice: slice,
              hasData: false,
              overloaded: false,
              splits: [],
            };
          }

          const hasData = true;
          const overloaded = stats.rounds === stats.roundsCapped;
          const splits = stats.splits;
          // If a worker is overloaded, we don't have reliable splits for that range.
          // Fetch extra shards to make better balancing decision.
          try {
            if (overloaded && splits.length > 0) {
              const snap = await queryRange(
                this.db,
                this.shardCollectionId,
                splits[splits.length - 1],
                slice.end,
                100000
              ).get();
              for (let i = 100; i < snap.docs.length; i += 100) {
                splits.push(snap.docs[i].ref.path);
              }
            }
          } catch (err) {
            logger.log(
              "Failed to calculate additional splits for worker: " + worker.id
            );
          }
          return { slice, hasData, overloaded, splits };
        })
      );

      let [reshard, slices] = ShardedCounterController.balanceWorkers(
        shardingInfo
      );
      if (reshard) {
        logger.log(
          "Resharding workers, new workers: " +
            slices.length +
            " prev num workers: " +
            query.docs.length
        );
        query.docs.forEach((snap) => t.delete(snap.ref));
        slices.forEach((slice, index) => {
          t.set(
            this.workersRef.doc(
              ShardedCounterController.encodeWorkerKey(index)
            ),
            {
              slice: slice,
              timestamp: firestore.FieldValue.serverTimestamp(),
            }
          );
        });
        t.set(this.controllerDocRef, {
          workers: slices,
          timestamp: firestore.FieldValue.serverTimestamp(),
        });
      } else {
        // Check workers that haven't updated stats for over 90s - they most likely failed.
        let failures = 0;
        query.docs.forEach((snap) => {
          if (timestamp / 1000 - snap.updateTime.seconds > 90) {
            t.set(
              snap.ref,
              { timestamp: firestore.FieldValue.serverTimestamp() },
              { merge: true }
            );
            failures++;
          }
        });
        logger.log("Detected " + failures + " failed workers.");
        t.set(
          this.controllerDocRef,
          { timestamp: firestore.FieldValue.serverTimestamp() },
          { merge: true }
        );
      }
    });
  }