in reference-implementations/semantic-search-for-images/src/ingestion/Services/DataIngestor.cs [39:77]
public async Task IngestDataAsync() {
IList<ImageMetadata> records;
await _databaseService.InitializeAsync();
await foreach (var blobItem in _blobService.GetImageCsvBlobsAsync())
{
try {
_logger.LogInformation($"Processing blob: {blobItem.Name}");
using var memoryStream = new MemoryStream();
await _blobService.DownloadImageCsvBlob(blobItem.Name, memoryStream);
records = _csvService.GetRecords(memoryStream);
}
catch(Exception ex) {
_logger.LogError(ex, $"Error processing blob: {blobItem.Name}. Skipping...");
continue;
}
await Parallel.ForEachAsync(records, async (record, token) => {
try {
record.imageVector = await _imageService.GenerateImageEmbeddings(record.imageUrl);
await _databaseService.UpsertItemAsync(record);
}
catch (ArgumentNullException ex) {
_logger.LogError(ex, $"Record with title: {record.title} and objectId: {record.objectId} had a null imageUrl. Skipping...");
return;
}
catch (Exception ex) {
_logger.LogError(ex, $"Error processing record with title: {record.title} and objectId: {record.objectId}. Skipping...");
return;
}
});
await _blobService.MoveBlobToProcessedContainer(blobItem);
}
}