in firestore-semantic-search/functions/src/functions/stream_update_datapoint.ts [28:97]
export async function streamUpdateDatapointHandler(
snap: FirebaseFirestore.DocumentSnapshot
) {
const {status, index} = await checkIndexStatus();
if (
(index && status !== IndexStatus.DEPLOYED) ||
status === IndexStatus.BUILDING
) {
functions.logger.info('Index not deployed yet, skipping...');
const queue = getFunctions().taskQueue(
'datapointWriteTask',
config.instanceId
);
// Index isn't ready yet, retry in an hour.
await queue.enqueue(
{
operation: 'remove',
docId: snap.id,
},
{
scheduleDelaySeconds: 60 * 60,
}
);
return;
}
if (!utils.isValidReference(snap.ref)) {
console.log(`Skipping ${snap.ref.path}`);
return;
}
const data = snap.data();
if (!data) return;
if (Object.keys(data).length === 0) return;
functions.logger.debug('Data to be embedded', {data});
const datapoint = await getDatapoint(snap.ref, data);
if (!datapoint) return;
functions.logger.info('Datapoint generated 🎉');
try {
// Get the index name from the metadata document.
functions.logger.info('Upserting datapoint to index', datapoint);
if (!index) {
functions.logger.error(
'Index not found, creating a new one and retrying...'
);
await backfillTriggerHandler({
forceCreateIndex: true,
document: snap,
});
return;
}
// Upsert the datapoint to the index.
await upsertDatapoint(index, [
{
datapoint_id: datapoint.id,
feature_vector: datapoint.embedding,
},
]);
} catch (error) {
functions.logger.error((error as AxiosError).response);
}
}