in firestore-bigquery-export/scripts/import/src/run.ts [20:105]
async function processCollectionGroup(config: CliConfig): Promise<number> {
const maxWorkers = Math.ceil(cpus().length / 2);
const workerPool = pool(__dirname + "/worker.js", {
maxWorkers,
forkOpts: {
env: {
PROJECT_ID: config.projectId,
GOOGLE_CLOUD_PROJECT: config.projectId,
GCLOUD_PROJECT: config.projectId,
...process.env,
},
},
});
const query = firebase
.firestore()
.collectionGroup(config.sourceCollectionPath);
const partitionsList = query.getPartitions(config.batchSize);
let total = 0;
let partitions = 0;
while (true) {
const inProgressTasks =
workerPool.stats().activeTasks + workerPool.stats().pendingTasks;
if (inProgressTasks >= maxWorkers) {
// A timeout is needed here to stop infinite rechecking of workpool.stats().
await new Promise((resolve) => setTimeout(resolve, 150, []));
continue;
}
// @ts-ignore, iterator not typed correctly.
const { value: partition, done } = await partitionsList.next();
if (done || !partition) {
break;
}
partitions++;
const query = partition.toQuery();
const serializedQuery = {
startAt: query._queryOptions.startAt,
endAt: query._queryOptions.endAt,
limit: query._queryOptions.limit,
offset: query._queryOptions.offset,
};
workerPool
.exec("processDocuments", [serializedQuery, config])
.then((count) => {
total += count;
console.log(`${total} documents processed`);
})
.catch((error) => {
console.error(
"An error has occurred on the following documents, please re-run or insert the following query documents manually...",
JSON.stringify(serializedQuery)
);
console.error(error);
process.exit(1);
});
}
// Wait for all tasks to be complete.
while (workerPool.stats().activeTasks + workerPool.stats().pendingTasks > 0) {
// Return a default promise
// A timeout is needed here to stop infinite rechecking of workpool.stats().
await new Promise((resolve) => setTimeout(resolve, 150, []));
}
await workerPool.terminate();
console.log(`Imported ${total} documents in ${partitions} partitions.`);
console.log("---------------------------------------------------------");
console.log(
`Please see https://console.cloud.google.com/bigquery?p=${
config.projectId
}&d=${config.datasetId}&t=${config.tableId}_raw_changelog&page=table`
);
console.log("---------------------------------------------------------");
return Promise.resolve(total);
}