public async Task ProcessImport()

in src/ApiForFhirMigrationTool.Function/ImportOrchestrator.cs [98:355]


        public async Task<ResponseModel> ProcessImport([ActivityTrigger] string requestContent, FunctionContext executionContext)
        {
            _logger?.LogInformation("Import process Started");
            ResponseModel importResponse = new ResponseModel();
            HttpMethod method = HttpMethod.Post;
            try
            {
                _logger?.LogInformation("Creating table clients");
                TableClient exportTableClient = _azureTableClientFactory.Create(_options.ExportTableName);
                TableClient chunktableClient = _azureTableClientFactory.Create(_options.ChunkTableName);
                _logger?.LogInformation("Table clients created successfully.");

                _logger?.LogInformation("Querying the export table to check for completed export jobs.");
                Pageable<TableEntity> jobListimport = exportTableClient.Query<TableEntity>(filter: ent => ent.GetBoolean("IsExportComplete") == true && ent.GetString("ImportRequest") == "Yes" && ent.GetBoolean("IsProcessed") == false && ent.GetBoolean("IsFirst") == true);
                _logger?.LogInformation("Query completed");

                if (jobListimport != null && jobListimport.Count() == 1)
                { 
                    _logger?.LogInformation("Starting import activities.");
                    var item = jobListimport.First();

                    _logger?.LogInformation("Retrieving export content location.");
                    string? statusUrl_new = item.GetString("exportContentLocation");
                    _logger?.LogInformation("Export content location retrieved successfully.");

                    _logger?.LogInformation("Retrieving export process Id from export content location.");
                    string statusId = GetProcessId(statusUrl_new);
                    _logger?.LogInformation("Export process Id retrieved successfully.");

                    string containerName = $"import-{statusId}";
                    BlobContainerClient containerClient = _azureBlobClientFactory.GetBlobContainerClient(containerName);
                    int blobCount = containerClient.GetBlobs().Count();
                    int payloadCounter = 0;
                    foreach (BlobItem blobItem in containerClient.GetBlobs())
                    {
                        if (payloadCounter < _options.PayloadCount && containerClient.GetBlobs().Count() > 0)
                        {
                            BlobClient blobClient = containerClient.GetBlobClient(blobItem.Name);
                            BlobDownloadInfo download = blobClient.Download();

                            using (var streamReader = new StreamReader(download.Content))
                            {
                                string content = await streamReader.ReadToEndAsync();
                                string statusUrl = String.Empty;

                                method = HttpMethod.Post;
                                _logger?.LogInformation($"Retrieving the import payload from '{containerName}' and posting it to the FHIR service.");
                                importResponse = await _importProcessor.CallProcess(method, content, _options.DestinationUri, "/$import", _options.DestinationHttpClient);
                                _logger?.LogInformation("Successfully posted the import payload to the FHIR service.");

                                if (importResponse.Status == ResponseStatus.Accepted)
                                {
                                    _logger?.LogInformation($"Import  returned: Success.");
                                   
                                    Pageable<TableEntity> jobListimport1 = exportTableClient.Query<TableEntity>(filter: ent => ent.GetBoolean("IsExportComplete") == true && ent.GetString("ImportRequest") == "Yes" && ent.GetString("ExportId") == statusId && ent.GetString("IsImportRunning") == "Not Started" && ent.GetBoolean("IsFirst") == true);
                                    if (jobListimport1.Count() == 1)
                                    {
                                        statusUrl = importResponse.Content;
                                        TableEntity exportEntity = _azureTableMetadataStore.GetEntity(exportTableClient, _options.PartitionKey, item.RowKey);
                                        exportEntity["IsImportComplete"] = false;
                                        exportEntity["IsImportRunning"] = "Started";
                                        exportEntity["importContentLocation"] = importResponse.Content;
                                        exportEntity["ImportStartTime"] = DateTime.UtcNow;
                                        exportEntity["ImportNo"] = blobItem.Name;

                                        _logger?.LogInformation("Starting update of the export table.");
                                        _azureTableMetadataStore.UpdateEntity(exportTableClient, exportEntity);
                                        _logger?.LogInformation("Completed update of the export table.");

                                        _logger?.LogInformation("Updating logs in Application Insights.");
                                        _telemetryClient.TrackEvent(
                                        "Import",
                                        new Dictionary<string, string>()
                                        {
                                            { "ImportId", _orchestrationHelper.GetProcessId(statusUrl) },
                                            { "StatusUrl", statusUrl },
                                            { "ImportStatus", "Started" },
                                        });
                                        _logger?.LogInformation("Logs updated successfully in Application Insights.");
                                    }
                                    else
                                    {
                                        TableEntity qEntity = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
                                        if (qEntity["ImportId"] != null)
                                        {
                                            int importId = (int)qEntity["ImportId"];
                                            string rowKey = _options.RowKey + statusId + importId++;

                                            var tableEntity = new TableEntity(_options.PartitionKey, rowKey)
                                            {
                                                { "exportContentLocation", item.GetString("exportContentLocation") },
                                                { "importContentLocation", importResponse.Content },
                                                { "IsExportComplete", true },
                                                { "IsExportRunning", "Completed" },
                                                { "IsImportComplete", false },
                                                { "IsImportRunning", "Started" },
                                                { "ImportRequest", "Yes" },
                                                { "Since",item.GetString("Since") },
                                                { "Till", item.GetString("Till") },
                                                { "StartTime", item.GetDateTime("StartTime") },
                                                {"TotalExportResourceCount",item.GetString("TotalExportResourceCount") },
                                                { "ImportStartTime", DateTime.UtcNow },
                                                {"ExportEndTime",item.GetDateTime("ExportEndTime")  },
                                                { "ExportId",  statusId },
                                                { "ImportNo",blobItem.Name},
                                            };
                                            _logger?.LogInformation("Starting update of the export table.");
                                            _azureTableMetadataStore.AddEntity(exportTableClient, tableEntity);
                                            _logger?.LogInformation("Completed update of the export table.");

                                            TableEntity qEntitynew = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
                                            qEntitynew["ImportId"] = importId++;

                                            _logger?.LogInformation("Starting update of the chunk table.");
                                            _azureTableMetadataStore.UpdateEntity(chunktableClient, qEntitynew);
                                            _logger?.LogInformation("Completed update of the chunk table.");

                                            _logger?.LogInformation("Updating logs in Application Insights.");
                                            _telemetryClient.TrackEvent(
                                        "Import",
                                            new Dictionary<string, string>()
                                            {
                                                { "ImportId", _orchestrationHelper.GetProcessId(statusUrl) },
                                                { "StatusUrl", statusUrl },
                                                { "ImportStatus", "Started" },
                                                });
                                            _logger?.LogInformation("Logs updated successfully in Application Insights.");
                                        }
                                        
                                    }
                                }
                                else
                                {
                                    _logger?.LogInformation($"Import  returned: Failure");
                                    Pageable<TableEntity> jobListimport1 = exportTableClient.Query<TableEntity>(filter: ent => ent.GetBoolean("IsExportComplete") == true && ent.GetString("ImportRequest") == "Yes" && ent.GetString("ExportId") == statusId && ent.GetString("IsImportRunning") == "Not Started" && ent.GetBoolean("IsFirst") == true);
                                    if (jobListimport1.Count() == 1)
                                    {
                                        string diagnosticsValue = JObject.Parse(importResponse.Content)?["issue"]?[0]?["diagnostics"]?.ToString() ?? "For more information check Content location.";
                                        _logger?.LogInformation($"Import Status check returned: Unsuccessful. Reason : {diagnosticsValue}");
                                        TableEntity exportEntity = _azureTableMetadataStore.GetEntity(exportTableClient, _options.PartitionKey, item.RowKey);
                                        exportEntity["IsImportComplete"] = false;
                                        exportEntity["IsImportRunning"] = "failed";
                                        exportEntity["FailureReason"] = diagnosticsValue;
                                        exportEntity["ImportNo"] = blobItem.Name;
                                        exportEntity["IsProcessed"] = true;

                                        _logger?.LogInformation("Starting update of the export table.");
                                        _azureTableMetadataStore.UpdateEntity(exportTableClient, exportEntity);
                                        _logger?.LogInformation("Completed update of the export table.");

                                        _logger?.LogInformation("Updating logs in Application Insights.");
                                        _telemetryClient.TrackEvent(
                                        "Import",
                                        new Dictionary<string, string>()
                                        {
                                            { "ImportId", _orchestrationHelper.GetProcessId(statusUrl) },
                                            { "StatusUrl", statusUrl },
                                            { "ImportStatus", "Failed" },
                                            { "FailureReason", diagnosticsValue }
                                        });
                                        _logger?.LogInformation("Logs updated successfully in Application Insights.");

                                    }
                                    else
                                    {
                                        _logger?.LogInformation($"Import  returned: Failure");
                                        TableEntity qEntity = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
                                        if (qEntity["ImportId"] != null)
                                        {
                                            int importId = (int)qEntity["ImportId"];
                                            string rowKey = _options.RowKey + statusId + importId++;
                                            string diagnosticsValue = JObject.Parse(importResponse.Content)?["issue"]?[0]?["diagnostics"]?.ToString() ?? "For more information check Content location.";
                                            _logger?.LogInformation($"Import Status check returned: Unsuccessful. Reason : {diagnosticsValue}");

                                            var tableEntity = new TableEntity(_options.PartitionKey, rowKey)
                                            {
                                                { "exportContentLocation", item.GetString("exportContentLocation") },
                                                { "IsExportComplete", true },
                                                { "IsExportRunning", "Completed" },
                                                { "IsImportComplete", false },
                                                { "IsImportRunning", "failed" },
                                                { "FailureReason",diagnosticsValue},
                                                { "ImportRequest", "Yes" },
                                                { "Since",item.GetString("Since") },
                                                { "Till", item.GetString("Till") },
                                                { "StartTime", item.GetDateTime("StartTime") },
                                                {"TotalExportResourceCount",item.GetString("TotalExportResourceCount") },
                                                {"ExportEndTime",item.GetDateTime("ExportEndTime")  },
                                                { "ExportId",  statusId },
                                                { "ImportNo",blobItem.Name},
                                                { "IsProcessed",true }

                                            };
                                            _logger?.LogInformation("Starting update of the export table.");
                                            _azureTableMetadataStore.AddEntity(exportTableClient, tableEntity);
                                            _logger?.LogInformation("Completed update of the export table.");

                                            TableEntity qEntitynew = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
                                            qEntitynew["ImportId"] = importId++;

                                            _logger?.LogInformation("Starting update of the chunk table.");
                                            _azureTableMetadataStore.UpdateEntity(chunktableClient, qEntitynew);
                                            _logger?.LogInformation("Completed update of the chunk table.");

                                            _logger?.LogInformation("Updating logs in Application Insights.");
                                            _telemetryClient.TrackEvent(
                                        "Import",
                                            new Dictionary<string, string>()
                                            {
                                                { "ImportId", _orchestrationHelper.GetProcessId(statusUrl) },
                                                { "StatusUrl", statusUrl },
                                                { "ImportStatus", "Failed" },
                                                { "FailureReason", diagnosticsValue }
                                            });
                                            _logger?.LogInformation("Logs updated successfully in Application Insights.");

                                        }
                                    }
                                }

                            }
                            payloadCounter++;
                            string newContainerName = $"importprocessed-{statusId}";
                            _logger?.LogInformation($"Created container '{newContainerName}' for storing processed import payloads.");
                            BlobContainerClient newContainerClient = _azureBlobClientFactory.GetBlobContainerClient(newContainerName);
                            await newContainerClient.CreateIfNotExistsAsync();

                            string newBlobName = $"{blobItem.Name}";
                            BlobClient newBlobClient = newContainerClient.GetBlobClient(newBlobName);
                            BlobDownloadInfo download1 = await blobClient.DownloadAsync();
                            using (StreamReader reader = new StreamReader(download1.Content))
                            {
                                string content = await reader.ReadToEndAsync();
                                string jsonContent = content.ToString();
                                using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(jsonContent)))
                                {
                                    await newBlobClient.UploadAsync(stream);
                                }
                            }
                            _logger?.LogInformation($"Successfully stored processed import payloads in container '{newContainerName}'.");
                            await blobClient.DeleteIfExistsAsync();
                        }

                    }
                }
                else
                {
                    importResponse = await _importProcessor.CallProcess(method, requestContent, _options.DestinationUri, "/$import", _options.DestinationHttpClient);
                }
            }

            catch
            {
                throw;
            }
            _logger?.LogInformation($"Import process Finished");
            return importResponse;
        }