public static async Task Run()

in src/FhirImporter/FhirBundleBlobTrigger.cs [31:184]


        public static async Task Run([BlobTrigger("fhirimport/{name}", Connection = "AzureWebJobsStorage")]Stream myBlob, string name, ILogger log)
        {
            log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {myBlob.Length} Bytes");

            var streamReader = new StreamReader(myBlob);
            var fhirString = await streamReader.ReadToEndAsync();
            
            JObject bundle;
            JArray entries;
            try
            {
                bundle = JObject.Parse(fhirString);
            }
            catch (JsonReaderException)
            {
                log.LogError("Input file is not a valid JSON document");
                await MoveBlobToRejected(name, log);
                return;
            }

            log.LogInformation("File read");

            try
            {
                FhirImportReferenceConverter.ConvertUUIDs(bundle);
            }
            catch
            {
                log.LogError("Failed to resolve references in doc");
                await MoveBlobToRejected(name, log);
                return;
            }

            try
            {
                entries = (JArray)bundle["entry"];
                if (entries == null)
                {
                    log.LogError("No entries found in bundle");
                    throw new FhirImportException("No entries found in bundle");
                }


                AuthenticationContext authContext;
                ClientCredential clientCredential;
                AuthenticationResult authResult;

                string authority = System.Environment.GetEnvironmentVariable("Authority");
                string audience = System.Environment.GetEnvironmentVariable("Audience");
                string clientId = System.Environment.GetEnvironmentVariable("ClientId");
                string clientSecret = System.Environment.GetEnvironmentVariable("ClientSecret");
                Uri fhirServerUrl = new Uri(System.Environment.GetEnvironmentVariable("FhirServerUrl"));

                int maxDegreeOfParallelism;
                if (!int.TryParse(System.Environment.GetEnvironmentVariable("MaxDegreeOfParallelism"), out maxDegreeOfParallelism))
                {
                    maxDegreeOfParallelism = 16;
                }

                try
                {
                    authContext = new AuthenticationContext(authority);
                    clientCredential = new ClientCredential(clientId, clientSecret);
                    authResult = authContext.AcquireTokenAsync(audience, clientCredential).Result;
                }
                catch (Exception ee)
                {
                    log.LogCritical(string.Format("Unable to obtain token to access FHIR server in FhirImportService {0}", ee.ToString()));
                    throw;
                }

                //var entriesNum = Enumerable.Range(0,entries.Count-1);
                var actionBlock = new ActionBlock<int>(async i =>
                {
                    var entry_json = ((JObject)entries[i])["resource"].ToString();
                    string resource_type = (string)((JObject)entries[i])["resource"]["resourceType"];
                    string id = (string)((JObject)entries[i])["resource"]["id"];
                    var randomGenerator = new Random();

                    Thread.Sleep(TimeSpan.FromMilliseconds(randomGenerator.Next(50)));

                    if (string.IsNullOrEmpty(entry_json))
                    {
                        log.LogError("No 'resource' section found in JSON document");
                        throw new FhirImportException("'resource' not found or empty");
                    }

                    if (string.IsNullOrEmpty(resource_type))
                    {
                        log.LogError("No resource_type found.");
                        throw new FhirImportException("No resource_type in resource.");
                    }

                    StringContent content = new StringContent(entry_json, Encoding.UTF8, "application/json");
                    var pollyDelays =
                            new[]
                            {
                                TimeSpan.FromMilliseconds(2000 + randomGenerator.Next(50)),
                                TimeSpan.FromMilliseconds(3000 + randomGenerator.Next(50)),
                                TimeSpan.FromMilliseconds(5000 + randomGenerator.Next(50)),
                                TimeSpan.FromMilliseconds(8000 + randomGenerator.Next(50))
                            };


                    HttpResponseMessage uploadResult = await Policy
                        .HandleResult<HttpResponseMessage>(response => !response.IsSuccessStatusCode)
                        .WaitAndRetryAsync(pollyDelays, (result, timeSpan, retryCount, context) =>
                        {
                            log.LogWarning($"Request failed with {result.Result.StatusCode}. Waiting {timeSpan} before next retry. Retry attempt {retryCount}");
                        })
                        .ExecuteAsync(() => {                            
                            var message = string.IsNullOrEmpty(id)
                                ? new HttpRequestMessage(HttpMethod.Post, new Uri(fhirServerUrl, $"/{resource_type}"))
                                : new HttpRequestMessage(HttpMethod.Put, new Uri(fhirServerUrl, $"/{resource_type}/{id}"));

                            message.Content = content;
                            message.Headers.Authorization = new AuthenticationHeaderValue("Bearer", authResult.AccessToken);
                            return httpClient.SendAsync(message);
                        });

                    if (!uploadResult.IsSuccessStatusCode)
                    {
                        string resultContent = await uploadResult.Content.ReadAsStringAsync();
                        log.LogError(resultContent);

                        // Throwing a generic exception here. This will leave the blob in storage and retry.
                        throw new Exception($"Unable to upload to server. Error code {uploadResult.StatusCode}");
                    }
                    else
                    {
                        log.LogInformation($"Uploaded /{resource_type}/{id}");
                    }
                },
                    new ExecutionDataflowBlockOptions
                    {
                        MaxDegreeOfParallelism = maxDegreeOfParallelism
                    }
                );

                for (var i = 0; i < entries.Count; i++)
                {
                    actionBlock.Post(i);
                }
                actionBlock.Complete();
                actionBlock.Completion.Wait();

                // We are done with this blob, upload was successful, it will be delete
                await GetBlobReference("fhirimport", name, log).DeleteIfExistsAsync();
            }
            catch (FhirImportException)
            {
                await MoveBlobToRejected(name, log);
            }
        }