in storage-reverse-image-search/functions/src/functions/stream_update_datapoint.ts [29:99]
export async function streamUpdateDatapointHandler(
object: functions.storage.ObjectMetadata
) {
const doc = await checkIndexStatus();
const {status, index} = doc || {};
if (
(index && status !== IndexStatus.DEPLOYED) ||
status === IndexStatus.BUILDING
) {
functions.logger.info('Index not deployed yet, skipping...');
const queue = getFunctions().taskQueue(
'datapointWriteTask',
config.instanceId
);
// Index isn't ready yet, retry in an hour.
await queue.enqueue(
{
operation: 'update',
imagePath: object.name,
},
{
scheduleDelaySeconds: 60 * 60,
}
);
return;
}
if (!object.name) return;
if (!utils.isImage(object.name)) {
functions.logger.info(`Skipping ${object.name}, not an image...`);
return;
}
functions.logger.info(`Processing ${object.name}...`);
const imagePath = object.name;
const vector = await getFeatureVectors([imagePath]);
if (!vector) return;
functions.logger.info('Datapoint generated 🎉');
try {
// Get the index name from the metadata document.
const metdata = await admin.firestore().doc(config.metadataDoc).get();
const index = metdata.data()?.index;
if (!index) {
functions.logger.error(
'Index not found, creating a new one and retrying...'
);
await backfillTriggerHandler({
forceCreateIndex: true,
object: object,
});
return;
}
// Upsert the datapoint to the index.
await upsertDatapoint(index, [
{
datapoint_id: object.name,
feature_vector: vector[0],
},
]);
} catch (error) {
functions.logger.error((error as AxiosError).response);
}
}