in firestore-semantic-search/functions/src/functions/datapoint_write_task.ts [31:105]
export async function datapointWriteTaskHandler(data: any) {
const {operation, docId} = data as {
operation: 'update' | 'remove';
docId: string;
};
const queue = getFunctions().taskQueue(
'datapointWriteTask',
config.instanceId
);
const {index, status} = await checkIndexStatus();
// If the index exist but isn't deployed yet, or is still building retry in an hour.
if (
(index && status !== IndexStatus.DEPLOYED) ||
status === IndexStatus.BUILDING
) {
functions.logger.info('Index not deployed yet, skipping...');
// Index isn't ready yet, retry in an hour.
await queue.enqueue(data, {
scheduleDelaySeconds: 60 * 60,
});
return;
}
try {
// Get the index name from the metadata document.
const metdata = await admin.firestore().doc(config.metadataDoc).get();
const index = metdata.data()?.index;
if (!index) {
functions.logger.error('Index not found');
return;
}
switch (operation) {
case 'remove':
// Upsert the datapoint to the index.
await removeDatapoint(index, [docId]);
break;
case 'update':
{
const snap = await admin
.firestore()
.doc(`${config.collectionName}/${docId}`)
.get();
const data = snap.data();
if (!data) return;
if (Object.keys(data).length === 0) return;
const datapoint = await getDatapoint(snap.ref, data);
if (!datapoint) {
functions.logger.info('No datapoint found, skipping...');
return;
}
functions.logger.info('Datapoint generated 🎉');
// Upsert the datapoint to the index.
await upsertDatapoint(index, [
{
datapoint_id: datapoint.id,
feature_vector: datapoint.embedding,
},
]);
}
break;
}
} catch (error) {
functions.logger.error((error as AxiosError).response);
}
}