in AzureCosmosDB/csharp/DocumentVectorPipelineFunctions/BlobTriggerFunction.cs [17:185]
public class BlobTriggerFunction(
IConfiguration configuration,
DocumentAnalysisClient documentAnalysisClient,
ILoggerFactory loggerFactory,
CosmosClient cosmosClient,
EmbeddingClient embeddingClient)
{
private readonly ILogger _logger = loggerFactory.CreateLogger<BlobTriggerFunction>();
private const string AzureOpenAIModelDeploymentDimensionsName = "AzureOpenAIModelDimensions";
private static readonly int DefaultDimensions = 1536;
private const string MaxTokensPerChunkName = "MaxTokensPerChunk";
private const string OverlapTokensName = "OverlapTokens";
private const int MaxRetryCount = 100;
private const int RetryDelay = 10 * 1000; // 10 seconds
private const int MaxBatchSize = 10;
private const int MaxDegreeOfParallelism = 50;
private int embeddingDimensions = DefaultDimensions;
[Function("BlobTriggerFunction")]
public async Task Run([BlobTrigger("documents/{name}", Connection = "AzureBlobStorageAccConnectionString")] BlobClient blobClient)
{
this._logger.LogInformation("Starting processing of blob name: '{name}'", blobClient.Name);
if (await blobClient.ExistsAsync())
{
await this.HandleBlobCreateEventAsync(blobClient);
}
else
{
await this.HandleBlobDeleteEventAsync(blobClient);
}
this._logger.LogInformation("Finished processing of blob name: '{name}'", blobClient.Name);
}
private async Task HandleBlobCreateEventAsync(BlobClient blobClient)
{
var cosmosDBClientWrapper = await CosmosDBClientWrapper.CreateInstance(cosmosClient, this._logger);
this.embeddingDimensions = configuration.GetValue<int>(AzureOpenAIModelDeploymentDimensionsName, DefaultDimensions);
this._logger.LogInformation("Using OpenAI model dimensions: '{embeddingDimensions}'.", this.embeddingDimensions);
var maxTokensPerChunk = configuration.GetValue<int>(MaxTokensPerChunkName, DocumentChunker.DefaultMaxTokensPerChunk);
var overlapTokens = configuration.GetValue<int>(OverlapTokensName, DocumentChunker.DefaultOverlapTokens);
var extension = Path.GetExtension(blobClient.Name);
var textChunks = new List<TextChunk>();
if (extension == ".txt")
{
using var stream = await blobClient.OpenReadAsync();
var lines = await ReadAllLinesAsync(stream);
textChunks.AddRange(DocumentChunker.ChunkTextLines(
lines, maxTokensPerChunk, overlapTokens));
}
else if (extension == ".md")
{
using var stream = await blobClient.OpenReadAsync();
var lines = await ReadAllLinesAsync(stream);
textChunks.AddRange(DocumentChunker.ChunkMarkdownLines(
lines, maxTokensPerChunk, overlapTokens));
}
else
{
this._logger.LogInformation("Analyzing document using DocumentAnalyzerService from blobUri: '{blobUri}' using layout: {layout}", blobClient.Name, "prebuilt-read");
using var memoryStream = new MemoryStream();
await blobClient.DownloadToAsync(memoryStream);
memoryStream.Seek(0, SeekOrigin.Begin);
var operation = await documentAnalysisClient.AnalyzeDocumentAsync(
WaitUntil.Completed,
"prebuilt-read",
memoryStream);
var result = operation.Value;
this._logger.LogInformation("Extracted content from '{name}', # pages {pageCount}", blobClient.Name, result.Pages.Count);
textChunks.AddRange(DocumentChunker.FixedSizeChunking(result, maxTokensPerChunk, overlapTokens));
}
var listOfBatches = textChunks.Chunk(MaxBatchSize).ToList();
this._logger.LogInformation("Processing list of batches in parallel, total batches: {listSize}, chunks count: {chunksCount}", listOfBatches.Count, textChunks.Count);
await Parallel.ForEachAsync(listOfBatches, new ParallelOptions { MaxDegreeOfParallelism = MaxDegreeOfParallelism }, async (batchChunkText, cancellationToken) =>
{
this._logger.LogInformation("Processing batch of size: {batchSize}", batchChunkText.Length);
await this.ProcessCurrentBatchAsync(blobClient, cosmosDBClientWrapper, [.. batchChunkText], cancellationToken);
});
this._logger.LogInformation("Finished processing blob {name}, total chunks processed {count}.", blobClient.Name, textChunks.Count);
}
private async Task ProcessCurrentBatchAsync(BlobClient blobClient, CosmosDBClientWrapper cosmosDBClientWrapper, List<TextChunk> batchChunkTexts, CancellationToken cancellationToken)
{
this._logger.LogInformation("Generating embeddings for batch of size: '{size}'.", batchChunkTexts.Count);
var embeddings = await this.GenerateEmbeddingsWithRetryAsync(batchChunkTexts);
this._logger.LogInformation("Creating Cosmos DB documents for batch of size {count}", batchChunkTexts.Count);
await cosmosDBClientWrapper.UpsertDocumentsAsync(blobClient.Uri.AbsoluteUri, batchChunkTexts, embeddings, cancellationToken);
}
private async Task<EmbeddingCollection> GenerateEmbeddingsWithRetryAsync(IEnumerable<TextChunk> batchChunkTexts)
{
var embeddingGenerationOptions = new EmbeddingGenerationOptions()
{
Dimensions = this.embeddingDimensions
};
var retryCount = 0;
while (retryCount < MaxRetryCount)
{
try
{
return await embeddingClient.GenerateEmbeddingsAsync(batchChunkTexts.Select(p => p.Text).ToList(), embeddingGenerationOptions);
}
catch (ClientResultException ex)
{
if (ex.Status is ((int)HttpStatusCode.TooManyRequests) or ((int)HttpStatusCode.Unauthorized))
{
if (retryCount >= MaxRetryCount)
{
throw new Exception($"Max retry attempts reached generating embeddings with exception: {ex}.");
}
retryCount++;
await Task.Delay(RetryDelay);
}
else
{
throw new Exception($"Failed to generate embeddings with error: {ex}.");
}
}
}
throw new Exception($"Failed to generate embeddings after retrying for ${MaxRetryCount} times.");
}
private async Task HandleBlobDeleteEventAsync(BlobClient blobClient)
{
// TODO (amisi) - Implement me.
this._logger.LogInformation("Handling delete event for blob name {name}.", blobClient.Name);
await Task.Delay(1);
}
private static async Task<List<string>> ReadAllLinesAsync(
Stream inputStream,
CancellationToken cancellationToken = default)
{
using var sr = new StreamReader(
inputStream, Encoding.UTF8, detectEncodingFromByteOrderMarks: true);
cancellationToken.ThrowIfCancellationRequested();
string? line;
var lines = new List<string>();
while ((line = await sr.ReadLineAsync(cancellationToken)) != null)
{
lines.Add(line);
cancellationToken.ThrowIfCancellationRequested();
}
return lines;
}
}