export async function backfillTriggerHandler()

in storage-reverse-image-search/functions/src/functions/backfill_trigger.ts [28:175]


export async function backfillTriggerHandler({
  forceCreateIndex,
  object,
}: {
  forceCreateIndex?: boolean;
  object?: functions.storage.ObjectMetadata;
} = {}) {
  const runtime = getExtensions().runtime();

  if (!forceCreateIndex && !config.doBackfill) {
    return runtime.setProcessingState(
      'PROCESSING_WARNING',
      'Backfill is disabled, index setup will start with the first image is added.'
    );
  }

  if (forceCreateIndex && object?.name) {
    functions.logger.info(
      `Forcing index creation via initial image upload: ${object.name}`
    );

    const featureVectors = await getFeatureVectors([object.name]);
    if (!featureVectors?.[0]?.length) {
      throw new Error('Failed to generate feature vector for initial image.');
    }

    const outputShape = featureVectors[0].length;

    await admin.firestore().doc(config.metadataDoc).set(
      {
        outputShape,
      },
      {merge: true}
    );

    await admin.firestore().doc(config.tasksDoc).set(
      {
        status: BackfillStatus.DONE,
      },
      {merge: true}
    );

    return runtime.setProcessingState(
      'PROCESSING_COMPLETE',
      'Backfill is disabled but index creation was initiated with first image.'
    );
  }

  const queue = getFunctions().taskQueue('backfillTask', config.instanceId);
  let writer = admin.firestore().batch();

  // Check if the bucket exists, if so, delete any files in it.
  // This might be a left-over from a previous installation.
  try {
    const bucket = await utils.getEmbeddingsBucket();
    await bucket.deleteFiles({prefix: 'datapoints', autoPaginate: false});
  } catch (error) {
    functions.logger.error('Failed to get or clean bucket:', error);
  }

  try {
    const objects = await utils.listImagesInBucket(object);

    if (objects.length === 0) {
      return runtime.setProcessingState(
        'PROCESSING_WARNING',
        'No images found in the bucket. You can start uploading images to the bucket to generate embeddings.'
      );
    }

    functions.logger.info(
      `Found ${objects.length} objects in path ${objects[0].parent.name} 📚`
    );

    let counter = 1;

    await admin.firestore().doc(config.tasksDoc).set({
      totalLength: objects.length,
      processedLength: 0,
      status: BackfillStatus.PENDING,
    });

    const chunks = utils.chunkArray<File>(objects, config.batchSize);

    for (const chunk of chunks) {
      const id = `ext-${config.instanceId}-task${counter}`;

      if (counter === 1) {
        // Enqueue the first task to be executed immediately.
        functions.logger.info(`Enqueuing the first task ${id} 🚀`);

        await queue.enqueue({
          id: id,
          bucket: config.imgBucket,
          objects: chunk.map(object => object.name),
        });
      }

      try {
        // Create a task document to track the progress of the task.
        writer.set(
          admin.firestore().doc(`${config.tasksDoc}/enqueues/${id}`),
          {
            taskId: id,
            status: BackfillStatus.PENDING,
            objects: chunk.map(object => object.name),
          },
          {merge: false}
        );

        if (
          counter % config.batchSize === 0 ||
          chunks.length < config.batchSize
        ) {
          functions.logger.info('Committing the batch...');

          await writer.commit();
          writer = admin.firestore().batch();
        }
      } catch (error) {
        functions.logger.error(error);
        await runtime.setProcessingState(
          'PROCESSING_FAILED',
          'Failed to generate embeddings, for more details check the logs.'
        );

        throw error;
      }

      counter++;
    }

    functions.logger.info(`${counter} tasks enqueued successfully 🚀`);

    return runtime.setProcessingState(
      'PROCESSING_COMPLETE',
      'Successfully enqueued all tasks to backfill the data.'
    );
  } catch (error) {
    functions.logger.error(error);
    await runtime.setProcessingState(
      'PROCESSING_FAILED',
      'Failed to generate embeddings, for more details check the logs.'
    );

    throw error;
  }
}