export async function datapointWriteTaskHandler()

in firestore-semantic-search/functions/src/functions/datapoint_write_task.ts [31:105]


export async function datapointWriteTaskHandler(data: any) {
  const {operation, docId} = data as {
    operation: 'update' | 'remove';
    docId: string;
  };

  const queue = getFunctions().taskQueue(
    'datapointWriteTask',
    config.instanceId
  );

  const {index, status} = await checkIndexStatus();

  // If the index exist but isn't deployed yet, or is still building retry in an hour.
  if (
    (index && status !== IndexStatus.DEPLOYED) ||
    status === IndexStatus.BUILDING
  ) {
    functions.logger.info('Index not deployed yet, skipping...');

    // Index isn't ready yet, retry in an hour.
    await queue.enqueue(data, {
      scheduleDelaySeconds: 60 * 60,
    });
    return;
  }

  try {
    // Get the index name from the metadata document.
    const metdata = await admin.firestore().doc(config.metadataDoc).get();
    const index = metdata.data()?.index;
    if (!index) {
      functions.logger.error('Index not found');
      return;
    }

    switch (operation) {
      case 'remove':
        // Upsert the datapoint to the index.
        await removeDatapoint(index, [docId]);
        break;
      case 'update':
        {
          const snap = await admin
            .firestore()
            .doc(`${config.collectionName}/${docId}`)
            .get();

          const data = snap.data();

          if (!data) return;
          if (Object.keys(data).length === 0) return;

          const datapoint = await getDatapoint(snap.ref, data);
          if (!datapoint) {
            functions.logger.info('No datapoint found, skipping...');
            return;
          }

          functions.logger.info('Datapoint generated 🎉');

          // Upsert the datapoint to the index.
          await upsertDatapoint(index, [
            {
              datapoint_id: datapoint.id,
              feature_vector: datapoint.embedding,
            },
          ]);
        }
        break;
    }
  } catch (error) {
    functions.logger.error((error as AxiosError).response);
  }
}