in firestore-semantic-search/functions/src/functions/backfill_trigger.ts [28:150]
export async function backfillTriggerHandler({
forceCreateIndex = false,
document,
}: {
forceCreateIndex?: boolean;
document?: functions.firestore.DocumentSnapshot;
}) {
const runtime = getExtensions().runtime();
if (!forceCreateIndex && !config.doBackfill) {
return runtime.setProcessingState(
'PROCESSING_WARNING',
`Backfill is disabled, index setup will start with the first write operation to the collection ${config.collectionName}.`
);
}
const queue = getFunctions().taskQueue(
`locations/${config.location}/functions/backfillTask`,
config.instanceId
);
let writer = admin.firestore().batch();
// Check if the backfill bucket exists, if so, delete any files in it.
// This might be a left-over from a previous installation.
try {
const bucket = admin.storage().bucket(config.bucketName);
if (await bucket.exists()) {
functions.logger.info(
`Found an existing bucket ${config.bucketName}, deleting it...`
);
await bucket.deleteFiles({prefix: 'datapoints', autoPaginate: false});
await bucket.delete({ignoreNotFound: true});
}
} catch (error) {
// Ignore the error if the bucket doesn't exist.
functions.logger.debug(error);
}
try {
const collection = admin.firestore().collection(config.collectionName);
const refs = document ? [document.ref] : await collection.listDocuments();
if (refs.length === 0) {
return runtime.setProcessingState(
'PROCESSING_WARNING',
'No documents found in the collection.'
);
}
functions.logger.info(
`Found ${refs.length} documents in the collection ${config.collectionName} 📚`
);
let counter = 1;
await admin.firestore().doc(config.tasksDoc).set({
totalLength: refs.length,
processedLength: 0,
status: BackfillStatus.PENDING,
});
const chunks = utils.chunkArray(refs, batchSize);
for (const chunk of chunks) {
const id = `ext-${config.instanceId}-task${counter}`;
if (counter === 1) {
// Enqueue the first task to be executed immediately.
functions.logger.info(`Enqueuing the first task ${id} 🚀`);
await queue.enqueue({
id: id,
collectionName: config.collectionName,
documentIds: chunk.map(ref => ref.id),
});
}
try {
// Create a task document to track the progress of the task.
writer.set(admin.firestore().doc(`${config.tasksDoc}/enqueues/${id}`), {
taskId: id,
status: BackfillStatus.PENDING,
documentIds: chunk.map(ref => ref.id),
});
if (counter % batchSize === 0 || chunks.length < batchSize) {
functions.logger.info('Committing the batch...');
await writer.commit();
writer = admin.firestore().batch();
}
} catch (error) {
functions.logger.error(error);
await runtime.setProcessingState(
'PROCESSING_FAILED',
'Failed to generate embeddings, for more details check the logs.'
);
throw error;
}
counter++;
}
functions.logger.info(`${counter} tasks enqueued successfully 🚀`);
return runtime.setProcessingState(
'PROCESSING_COMPLETE',
'Successfully enqueued all tasks to backfill the data.'
);
} catch (error) {
functions.logger.error(error);
await runtime.setProcessingState(
'PROCESSING_FAILED',
'Failed to generate embeddings, for more details check the logs.'
);
throw error;
}
}