private async Task HandleBlobCreateEventAsync()

in AzureSQL/csharp/DocumentVectorPipelineFunctions/BlobTriggerFunction.cs [62:155]


    private async Task HandleBlobCreateEventAsync(BlobClient blobClient)
    {
        embeddingDimensions = configuration.GetValue<int>(AzureOpenAIModelDeploymentDimensionsName, DefaultDimensions);
        var connectionString = configuration.GetValue<string>(SqlConnectionString);
        _logger.LogInformation("Using OpenAI model dimensions: '{embeddingDimensions}'.", embeddingDimensions);

        _logger.LogInformation("Analyzing document using DocumentAnalyzerService from blobUri: '{blobUri}' using layout: {layout}", blobClient.Name, "prebuilt-read");

        using MemoryStream memoryStream = new MemoryStream();
        await blobClient.DownloadToAsync(memoryStream);
        memoryStream.Seek(0, SeekOrigin.Begin);

        var operation = await documentAnalysisClient.AnalyzeDocumentAsync(
            WaitUntil.Completed,
            "prebuilt-read",
            memoryStream);

        var result = operation.Value;
        _logger.LogInformation("Extracted content from '{name}', # pages {pageCount}", blobClient.Name, result.Pages.Count);

        var textChunks = TextChunker.FixedSizeChunking(result);

        var listOfBatches = new List<List<TextChunk>>();

        int totalChunksCount = 0;
        var batchChunkTexts = new List<TextChunk>(MaxBatchSize);
        for (int i = 0; i <= textChunks.Count(); i++)
        {
            if (i == textChunks.Count())
            {
                if (batchChunkTexts.Count > 0)
                {
                    listOfBatches.Add(new List<TextChunk>(batchChunkTexts));
                }
                batchChunkTexts.Clear();

                break;
            }

            batchChunkTexts.Add(textChunks.ElementAt(i));
            totalChunksCount++;

            if (batchChunkTexts.Count >= MaxBatchSize)
            {
                listOfBatches.Add(new List<TextChunk>(batchChunkTexts));
                batchChunkTexts.Clear();
            }
        }

        _logger.LogInformation("Processing list of batches in parallel, total batches: {listSize}, chunks count: {chunksCount}", listOfBatches.Count(), totalChunksCount);

        await EnsureDocumentTableExistsAsync(connectionString);

        await Parallel.ForEachAsync(listOfBatches, new ParallelOptions { MaxDegreeOfParallelism = MaxDegreeOfParallelism }, async (batchChunkTexts, cancellationToken) =>
        {
            _logger.LogInformation("Processing batch of size: {batchSize}", batchChunkTexts.Count);

            if (batchChunkTexts.Count > 0)
            {
                var embeddings = await GenerateEmbeddingsWithRetryAsync(batchChunkTexts);
                _logger.LogInformation("Embeddings generated: {0}", embeddings.Count);

                if (embeddings.Count > 0)
                {
                    // Save into Azure SQL
                    _logger.LogInformation("Begin Saving data in Azure SQL");

                    for (int index = 0; index < batchChunkTexts.Count; index++)
                    {
                        using (var connection = new SqlConnection(connectionString))
                        {
                            string SanitizedName = SantizeDatabaseObjectName(TableSchemaName) + "." + SantizeDatabaseObjectName(TableName);
                            string insertQuery = $@"INSERT INTO {SanitizedName} (ChunkId, DocumentUrl, Embedding, ChunkText, PageNumber) VALUES (@ChunkId, @DocumentUrl, @Embedding, @ChunkText, @PageNumber);";

                            var doc = new Document()
                            {
                                ChunkId = batchChunkTexts[index].ChunkNumber,
                                DocumentUrl = blobClient.Uri.AbsoluteUri,
                                Embedding = JsonSerializer.Serialize(embeddings[index].Vector),
                                ChunkText = batchChunkTexts[index].Text,
                                PageNumber = batchChunkTexts[index].PageNumberIfKnown,
                            };                           
                            //connection.AccessToken = token.Token;
                            var result = connection.Execute(insertQuery, doc);
                        }                                    
                    }

                    _logger.LogInformation("End Saving data in Azure SQL");
                }
            }            
        });

        _logger.LogInformation("Finished processing blob {name}, total chunks processed {count}.", blobClient.Name, totalChunksCount);
    }