in firestore-counter/functions/lib/worker.js [56:133]
run() {
return new Promise((resolve, reject) => {
let intervalTimer;
let timeoutTimer;
let unsubscribeMetadataListener;
let unsubscribeSliceListener;
const shutdown = async () => {
clearInterval(intervalTimer);
clearTimeout(timeoutTimer);
unsubscribeMetadataListener();
unsubscribeSliceListener();
if (this.aggregation != null) {
try {
await this.aggregation;
}
catch (err) {
// Not much here we can do, transaction is over.
}
}
};
const writeStats = async () => {
this.allPaths.sort();
let splits = this.allPaths.filter((val, idx) => idx !== 0 && idx % 100 === 0);
let stats = {
shardsAggregated: this.allPaths.length,
splits: splits,
lastSuccessfulRun: Date.now(),
rounds: this.rounds,
roundsCapped: this.roundsCapped,
};
try {
await this.db.runTransaction(async (t) => {
try {
const snap = await t.get(this.metadoc.ref);
if (snap.exists && deepEqual(snap.data(), this.metadata)) {
t.update(snap.ref, {
timestamp: firebase_admin_1.firestore.FieldValue.serverTimestamp(),
stats: stats,
});
}
}
catch (err) {
firebase_functions_1.logger.log("Failed to save writer stats.", err);
}
});
}
catch (err) {
firebase_functions_1.logger.log("Failed to save writer stats.", err);
}
};
intervalTimer = setInterval(() => {
this.maybeAggregate();
}, 1000);
timeoutTimer = setTimeout(() => shutdown()
.then(writeStats)
.then(resolve)
.catch(reject), WORKER_TIMEOUT_MS);
unsubscribeMetadataListener = this.metadoc.ref.onSnapshot((snap) => {
// if something's changed in the worker metadata since we were called, abort.
if (!snap.exists || !deepEqual(snap.data(), this.metadata)) {
firebase_functions_1.logger.log("Shutting down because metadoc changed.");
shutdown()
.then(resolve)
.catch(reject);
}
});
unsubscribeSliceListener = common_1.queryRange(this.db, this.shardCollection, this.metadata.slice.start, this.metadata.slice.end, SHARDS_LIMIT).onSnapshot((snap) => {
this.shards = snap.docs;
if (this.singleRun && this.shards.length === 0) {
firebase_functions_1.logger.log("Shutting down, single run mode.");
shutdown()
.then(writeStats)
.then(resolve)
.catch(reject);
}
});
});
}