in AzureSQL/csharp/DocumentVectorPipelineFunctions/BlobTriggerFunction.cs [62:155]
private async Task HandleBlobCreateEventAsync(BlobClient blobClient)
{
embeddingDimensions = configuration.GetValue<int>(AzureOpenAIModelDeploymentDimensionsName, DefaultDimensions);
var connectionString = configuration.GetValue<string>(SqlConnectionString);
_logger.LogInformation("Using OpenAI model dimensions: '{embeddingDimensions}'.", embeddingDimensions);
_logger.LogInformation("Analyzing document using DocumentAnalyzerService from blobUri: '{blobUri}' using layout: {layout}", blobClient.Name, "prebuilt-read");
using MemoryStream memoryStream = new MemoryStream();
await blobClient.DownloadToAsync(memoryStream);
memoryStream.Seek(0, SeekOrigin.Begin);
var operation = await documentAnalysisClient.AnalyzeDocumentAsync(
WaitUntil.Completed,
"prebuilt-read",
memoryStream);
var result = operation.Value;
_logger.LogInformation("Extracted content from '{name}', # pages {pageCount}", blobClient.Name, result.Pages.Count);
var textChunks = TextChunker.FixedSizeChunking(result);
var listOfBatches = new List<List<TextChunk>>();
int totalChunksCount = 0;
var batchChunkTexts = new List<TextChunk>(MaxBatchSize);
for (int i = 0; i <= textChunks.Count(); i++)
{
if (i == textChunks.Count())
{
if (batchChunkTexts.Count > 0)
{
listOfBatches.Add(new List<TextChunk>(batchChunkTexts));
}
batchChunkTexts.Clear();
break;
}
batchChunkTexts.Add(textChunks.ElementAt(i));
totalChunksCount++;
if (batchChunkTexts.Count >= MaxBatchSize)
{
listOfBatches.Add(new List<TextChunk>(batchChunkTexts));
batchChunkTexts.Clear();
}
}
_logger.LogInformation("Processing list of batches in parallel, total batches: {listSize}, chunks count: {chunksCount}", listOfBatches.Count(), totalChunksCount);
await EnsureDocumentTableExistsAsync(connectionString);
await Parallel.ForEachAsync(listOfBatches, new ParallelOptions { MaxDegreeOfParallelism = MaxDegreeOfParallelism }, async (batchChunkTexts, cancellationToken) =>
{
_logger.LogInformation("Processing batch of size: {batchSize}", batchChunkTexts.Count);
if (batchChunkTexts.Count > 0)
{
var embeddings = await GenerateEmbeddingsWithRetryAsync(batchChunkTexts);
_logger.LogInformation("Embeddings generated: {0}", embeddings.Count);
if (embeddings.Count > 0)
{
// Save into Azure SQL
_logger.LogInformation("Begin Saving data in Azure SQL");
for (int index = 0; index < batchChunkTexts.Count; index++)
{
using (var connection = new SqlConnection(connectionString))
{
string SanitizedName = SantizeDatabaseObjectName(TableSchemaName) + "." + SantizeDatabaseObjectName(TableName);
string insertQuery = $@"INSERT INTO {SanitizedName} (ChunkId, DocumentUrl, Embedding, ChunkText, PageNumber) VALUES (@ChunkId, @DocumentUrl, @Embedding, @ChunkText, @PageNumber);";
var doc = new Document()
{
ChunkId = batchChunkTexts[index].ChunkNumber,
DocumentUrl = blobClient.Uri.AbsoluteUri,
Embedding = JsonSerializer.Serialize(embeddings[index].Vector),
ChunkText = batchChunkTexts[index].Text,
PageNumber = batchChunkTexts[index].PageNumberIfKnown,
};
//connection.AccessToken = token.Token;
var result = connection.Execute(insertQuery, doc);
}
}
_logger.LogInformation("End Saving data in Azure SQL");
}
}
});
_logger.LogInformation("Finished processing blob {name}, total chunks processed {count}.", blobClient.Name, totalChunksCount);
}