in firestore-counter/functions/src/worker.ts [75:167]
public run(): Promise<void> {
return new Promise((resolve, reject) => {
let intervalTimer: any;
let timeoutTimer: any;
let unsubscribeMetadataListener: (() => void);
let unsubscribeSliceListener: (() => void);
const shutdown = async () => {
clearInterval(intervalTimer);
clearTimeout(timeoutTimer);
unsubscribeMetadataListener();
unsubscribeSliceListener();
if (this.aggregation != null) {
try {
await this.aggregation;
} catch (err) {
// Not much here we can do, transaction is over.
}
}
};
const writeStats = async () => {
this.allPaths.sort();
let splits = this.allPaths.filter(
(val, idx) => idx !== 0 && idx % 100 === 0
);
let stats: WorkerStats = {
shardsAggregated: this.allPaths.length,
splits: splits,
lastSuccessfulRun: Date.now(),
rounds: this.rounds,
roundsCapped: this.roundsCapped,
};
try {
await this.db.runTransaction(async (t) => {
try {
const snap = await t.get(this.metadoc.ref);
if (snap.exists && deepEqual(snap.data(), this.metadata)) {
t.update(snap.ref, {
timestamp: firestore.FieldValue.serverTimestamp(),
stats: stats,
});
}
} catch (err) {
logger.log("Failed to save writer stats.", err);
}
});
} catch (err) {
logger.log("Failed to save writer stats.", err);
}
};
intervalTimer = setInterval(() => {
this.maybeAggregate();
}, 1000);
timeoutTimer = setTimeout(
() =>
shutdown()
.then(writeStats)
.then(resolve)
.catch(reject),
WORKER_TIMEOUT_MS
);
unsubscribeMetadataListener = this.metadoc.ref.onSnapshot((snap) => {
// if something's changed in the worker metadata since we were called, abort.
if (!snap.exists || !deepEqual(snap.data(), this.metadata)) {
logger.log("Shutting down because metadoc changed.");
shutdown()
.then(resolve)
.catch(reject);
}
});
unsubscribeSliceListener = queryRange(
this.db,
this.shardCollection,
this.metadata.slice.start,
this.metadata.slice.end,
SHARDS_LIMIT
).onSnapshot((snap) => {
this.shards = snap.docs;
if (this.singleRun && this.shards.length === 0) {
logger.log("Shutting down, single run mode.");
shutdown()
.then(writeStats)
.then(resolve)
.catch(reject);
}
});
});
}