export async function streamUpdateDatapointHandler()

in storage-reverse-image-search/functions/src/functions/stream_update_datapoint.ts [29:99]


export async function streamUpdateDatapointHandler(
  object: functions.storage.ObjectMetadata
) {
  const doc = await checkIndexStatus();
  const {status, index} = doc || {};

  if (
    (index && status !== IndexStatus.DEPLOYED) ||
    status === IndexStatus.BUILDING
  ) {
    functions.logger.info('Index not deployed yet, skipping...');
    const queue = getFunctions().taskQueue(
      'datapointWriteTask',
      config.instanceId
    );
    // Index isn't ready yet, retry in an hour.
    await queue.enqueue(
      {
        operation: 'update',
        imagePath: object.name,
      },
      {
        scheduleDelaySeconds: 60 * 60,
      }
    );
    return;
  }

  if (!object.name) return;
  if (!utils.isImage(object.name)) {
    functions.logger.info(`Skipping ${object.name}, not an image...`);
    return;
  }

  functions.logger.info(`Processing ${object.name}...`);

  const imagePath = object.name;

  const vector = await getFeatureVectors([imagePath]);
  if (!vector) return;

  functions.logger.info('Datapoint generated 🎉');

  try {
    // Get the index name from the metadata document.
    const metdata = await admin.firestore().doc(config.metadataDoc).get();
    const index = metdata.data()?.index;

    if (!index) {
      functions.logger.error(
        'Index not found, creating a new one and retrying...'
      );

      await backfillTriggerHandler({
        forceCreateIndex: true,
        object: object,
      });
      return;
    }

    // Upsert the datapoint to the index.
    await upsertDatapoint(index, [
      {
        datapoint_id: object.name,
        feature_vector: vector[0],
      },
    ]);
  } catch (error) {
    functions.logger.error((error as AxiosError).response);
  }
}