export async function streamUpdateDatapointHandler()

in firestore-semantic-search/functions/src/functions/stream_update_datapoint.ts [28:97]


export async function streamUpdateDatapointHandler(
  snap: FirebaseFirestore.DocumentSnapshot
) {
  const {status, index} = await checkIndexStatus();
  if (
    (index && status !== IndexStatus.DEPLOYED) ||
    status === IndexStatus.BUILDING
  ) {
    functions.logger.info('Index not deployed yet, skipping...');
    const queue = getFunctions().taskQueue(
      'datapointWriteTask',
      config.instanceId
    );

    // Index isn't ready yet, retry in an hour.
    await queue.enqueue(
      {
        operation: 'remove',
        docId: snap.id,
      },
      {
        scheduleDelaySeconds: 60 * 60,
      }
    );
    return;
  }

  if (!utils.isValidReference(snap.ref)) {
    console.log(`Skipping ${snap.ref.path}`);
    return;
  }

  const data = snap.data();

  if (!data) return;
  if (Object.keys(data).length === 0) return;

  functions.logger.debug('Data to be embedded', {data});
  const datapoint = await getDatapoint(snap.ref, data);

  if (!datapoint) return;
  functions.logger.info('Datapoint generated 🎉');
  try {
    // Get the index name from the metadata document.
    functions.logger.info('Upserting datapoint to index', datapoint);

    if (!index) {
      functions.logger.error(
        'Index not found, creating a new one and retrying...'
      );

      await backfillTriggerHandler({
        forceCreateIndex: true,
        document: snap,
      });

      return;
    }

    // Upsert the datapoint to the index.
    await upsertDatapoint(index, [
      {
        datapoint_id: datapoint.id,
        feature_vector: datapoint.embedding,
      },
    ]);
  } catch (error) {
    functions.logger.error((error as AxiosError).response);
  }
}