export async function backfillTriggerHandler()

in firestore-semantic-search/functions/src/functions/backfill_trigger.ts [28:150]


export async function backfillTriggerHandler({
  forceCreateIndex = false,
  document,
}: {
  forceCreateIndex?: boolean;
  document?: functions.firestore.DocumentSnapshot;
}) {
  const runtime = getExtensions().runtime();

  if (!forceCreateIndex && !config.doBackfill) {
    return runtime.setProcessingState(
      'PROCESSING_WARNING',
      `Backfill is disabled, index setup will start with the first write operation to the collection ${config.collectionName}.`
    );
  }

  const queue = getFunctions().taskQueue(
    `locations/${config.location}/functions/backfillTask`,
    config.instanceId
  );
  let writer = admin.firestore().batch();

  // Check if the backfill bucket exists, if so, delete any files in it.
  // This might be a left-over from a previous installation.
  try {
    const bucket = admin.storage().bucket(config.bucketName);

    if (await bucket.exists()) {
      functions.logger.info(
        `Found an existing bucket ${config.bucketName}, deleting it...`
      );

      await bucket.deleteFiles({prefix: 'datapoints', autoPaginate: false});
      await bucket.delete({ignoreNotFound: true});
    }
  } catch (error) {
    // Ignore the error if the bucket doesn't exist.
    functions.logger.debug(error);
  }

  try {
    const collection = admin.firestore().collection(config.collectionName);

    const refs = document ? [document.ref] : await collection.listDocuments();

    if (refs.length === 0) {
      return runtime.setProcessingState(
        'PROCESSING_WARNING',
        'No documents found in the collection.'
      );
    }

    functions.logger.info(
      `Found ${refs.length} documents in the collection ${config.collectionName} 📚`
    );

    let counter = 1;

    await admin.firestore().doc(config.tasksDoc).set({
      totalLength: refs.length,
      processedLength: 0,
      status: BackfillStatus.PENDING,
    });

    const chunks = utils.chunkArray(refs, batchSize);

    for (const chunk of chunks) {
      const id = `ext-${config.instanceId}-task${counter}`;

      if (counter === 1) {
        // Enqueue the first task to be executed immediately.
        functions.logger.info(`Enqueuing the first task ${id} 🚀`);

        await queue.enqueue({
          id: id,
          collectionName: config.collectionName,
          documentIds: chunk.map(ref => ref.id),
        });
      }

      try {
        // Create a task document to track the progress of the task.
        writer.set(admin.firestore().doc(`${config.tasksDoc}/enqueues/${id}`), {
          taskId: id,
          status: BackfillStatus.PENDING,
          documentIds: chunk.map(ref => ref.id),
        });

        if (counter % batchSize === 0 || chunks.length < batchSize) {
          functions.logger.info('Committing the batch...');

          await writer.commit();
          writer = admin.firestore().batch();
        }
      } catch (error) {
        functions.logger.error(error);
        await runtime.setProcessingState(
          'PROCESSING_FAILED',
          'Failed to generate embeddings, for more details check the logs.'
        );

        throw error;
      }

      counter++;
    }

    functions.logger.info(`${counter} tasks enqueued successfully 🚀`);

    return runtime.setProcessingState(
      'PROCESSING_COMPLETE',
      'Successfully enqueued all tasks to backfill the data.'
    );
  } catch (error) {
    functions.logger.error(error);
    await runtime.setProcessingState(
      'PROCESSING_FAILED',
      'Failed to generate embeddings, for more details check the logs.'
    );

    throw error;
  }
}