in src/ApiForFhirMigrationTool.Function/ImportStatusOrchestrator.cs [49:432]
public async Task<string> ImportStatusOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context, string requestContent)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(ImportStatusOrchestration));
logger.LogInformation("Starting import status activities.");
var statusRespose = new HttpResponseMessage();
var statusUrl = string.Empty;
bool isComplete = false;
string? resContent = string.Empty;
var resourceCount = string.Empty;
try
{
logger.LogInformation("Creating table clients");
TableClient exportTableClient = _azureTableClientFactory.Create(_options.ExportTableName);
TableClient chunktableClient = _azureTableClientFactory.Create(_options.ChunkTableName);
logger.LogInformation("Table clients created successfully.");
logger.LogInformation(" Query the export table to check for running or started import jobs.");
Pageable<TableEntity> jobListimportRunning = exportTableClient.Query<TableEntity>(filter: ent => ent.GetString("IsImportRunning") == "Started" || ent.GetString("IsImportRunning") == "Running");
logger?.LogInformation("Query completed");
if (jobListimportRunning.Count() > 0)
{
var item = jobListimportRunning.First();
while (isComplete == false)
{
logger?.LogInformation("Retrieving import content location.");
statusUrl = item.GetString("importContentLocation");
logger?.LogInformation("Import content location retrieved successfully.");
logger?.LogInformation("Calling ProcessImportStatusCheck function");
ResponseModel response = await context.CallActivityAsync<ResponseModel>(nameof(ProcessImportStatusCheck), statusUrl);
logger?.LogInformation("ProcessImportStatusCheck function has completed.");
if (response.Status == ResponseStatus.Accepted)
{
logger?.LogInformation($"Import Status check returned: InProgress.");
logger?.LogInformation($"Waiting for next status check for {_options.ScheduleInterval} minutes.");
DateTime waitTime = context.CurrentUtcDateTime.Add(TimeSpan.FromMinutes(
Convert.ToDouble(_options.ScheduleInterval)));
logger?.LogInformation("Successfully retrieved entities from the export table.");
TableEntity exportEntity = _azureTableMetadataStore.GetEntity(exportTableClient, _options.PartitionKey, item.RowKey);
exportEntity["IsImportRunning"] = "Running";
logger?.LogInformation("Updated 'IsImportRunning' status of the entity to 'Running'.");
Tuple<Uri, string> source = new Tuple<Uri, string>(_options.SourceUri, _options.SourceHttpClient);
Tuple<Uri, string> destination = new Tuple<Uri, string>(_options.DestinationUri, _options.DestinationHttpClient);
var azureApiForFhirTotal = await context.CallActivityAsync<Tuple<int?, string>>(nameof(GetTotalFromFhirAsync), source);
logger?.LogInformation("Retrieved the total number of resources from the source Azure API For FHIR.");
var fhirServiceTotal = await context.CallActivityAsync<Tuple<int?, string>>(nameof(GetTotalFromFhirAsync), destination);
logger?.LogInformation("Retrieved the total number of resources from the destination FHIR Service.");
if (azureApiForFhirTotal.Item2 != string.Empty)
{
exportEntity["SourceError"] = azureApiForFhirTotal.Item2.ToString();
}
else
{
exportEntity["SourceResourceCount"] = azureApiForFhirTotal.Item1.ToString();
}
if (fhirServiceTotal.Item2 != string.Empty)
{
exportEntity["DestinationError"] = fhirServiceTotal.Item2.ToString();
}
else
{
exportEntity["DestinationResourceCount"] = fhirServiceTotal.Item1.ToString();
}
logger?.LogInformation("Starting update of the export table.");
_azureTableMetadataStore.UpdateEntity(exportTableClient, exportEntity);
logger?.LogInformation("Completed update of the export table.");
logger?.LogInformation("Updating logs in Application Insights.");
_telemetryClient.TrackEvent(
"Import",
new Dictionary<string, string>()
{
{ "ImportId", _orchestrationHelper.GetProcessId(statusUrl) },
{ "StatusUrl", statusUrl },
{ "ImportStatus", "Running" },
{ "SourceResourceCount", azureApiForFhirTotal.Item1.HasValue ? azureApiForFhirTotal.Item1.Value.ToString() : " " },
{ "DestinationResourceCount", fhirServiceTotal.Item1.HasValue ? fhirServiceTotal.Item1.Value.ToString() : " " },
{ "SourceError", azureApiForFhirTotal.Item2 ?? " " },
{ "DestinationError", fhirServiceTotal.Item2 ?? " " },
});
logger?.LogInformation("Logs updated successfully in Application Insights.");
await context.CreateTimer(waitTime, CancellationToken.None);
}
else if (response.Status == ResponseStatus.Completed)
{
resContent = response.Content;
if (!string.IsNullOrEmpty(resContent))
{
JObject objResponse = JObject.Parse(resContent);
var objOutput = objResponse["output"] as JArray;
if (objOutput != null && objOutput.Any())
{
resourceCount = _orchestrationHelper.CalculateSumOfResources(objOutput).ToString();
}
}
logger?.LogInformation($"Import Status check returned: Success.");
TableEntity exportEntity = _azureTableMetadataStore.GetEntity(exportTableClient, _options.PartitionKey, item.RowKey);
exportEntity["IsImportComplete"] = true;
exportEntity["IsImportRunning"] = "Completed";
exportEntity["EndTime"] = DateTime.UtcNow;
exportEntity["TotalImportResourceCount"] = resourceCount;
Tuple<Uri, string> source = new Tuple<Uri, string>(_options.SourceUri, _options.SourceHttpClient);
Tuple<Uri, string> destination = new Tuple<Uri, string>(_options.DestinationUri, _options.DestinationHttpClient);
var azureApiForFhirTotal = await context.CallActivityAsync<Tuple<int?, string>>(nameof(GetTotalFromFhirAsync), source);
logger?.LogInformation("Retrieved the total number of resources from the source Azure API For FHIR.");
var fhirServiceTotal = await context.CallActivityAsync<Tuple<int?, string>>(nameof(GetTotalFromFhirAsync), destination);
logger?.LogInformation("Retrieved the total number of resources from the destination FHIR Service.");
if (azureApiForFhirTotal.Item2 != string.Empty)
{
exportEntity["SourceError"] = azureApiForFhirTotal.Item2.ToString();
}
else
{
exportEntity["SourceResourceCount"] = azureApiForFhirTotal.Item1.ToString();
}
if (fhirServiceTotal.Item2 != string.Empty)
{
exportEntity["DestinationError"] = fhirServiceTotal.Item2.ToString();
}
else
{
exportEntity["DestinationResourceCount"] = fhirServiceTotal.Item1.ToString();
}
logger?.LogInformation("Starting to update the total resource count in the export table.");
_azureTableMetadataStore.UpdateEntity(exportTableClient, exportEntity);
logger?.LogInformation("Successfully updated the total resource count in the export table.");
Pageable<TableEntity> jobListimport = exportTableClient.Query<TableEntity>(filter: ent => ent.GetBoolean("IsExportComplete") == true && ent.GetString("ImportRequest") == "Yes" && ent.GetBoolean("IsProcessed") == false && ent.GetBoolean("IsFirst") == true);
if (jobListimport.Count() == 1)
{
foreach (TableEntity jobImport in jobListimport)
{
TableEntity exportEntity1 = _azureTableMetadataStore.GetEntity(exportTableClient, _options.PartitionKey, jobImport.RowKey);
#pragma warning disable CS8629 // Nullable value type may be null.
int payloadCount = (int)jobImport.GetInt32("PayloadCount");
int completeCount = (int)jobImport.GetInt32("CompletedCount");
#pragma warning restore CS8629 // Nullable value type may be null.
completeCount++;
if (payloadCount == completeCount)
{
exportEntity1["IsProcessed"] = true;
if (_options.IsParallel == true)
{
TableEntity qEntitynew = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
qEntitynew["since"] = exportEntity["Till"];
logger?.LogInformation("Starting update of the chunk table.");
_azureTableMetadataStore.UpdateEntity(chunktableClient, qEntitynew);
logger?.LogInformation("Completed update of the chunk table.");
logger?.LogInformation("Updating logs in Application Insights.");
_telemetryClient.TrackEvent(
"ImportTill",
new Dictionary<string, string>()
{
{ "Till", exportEntity["Till"].ToString() }
});
logger?.LogInformation("Logs updated successfully in Application Insights.");
}
else
{
TableEntity qEntityResourceType = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
if (qEntityResourceType["multiExport"].ToString() != "Running")
{
if ((int)qEntityResourceType["noOfResources"] - 1 == (int)qEntityResourceType["resourceTypeIndex"])
{
qEntityResourceType = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
qEntityResourceType["globalSinceExportType"] = qEntityResourceType["globalTillExportType"];
qEntityResourceType["globalTillExportType"] = "";
qEntityResourceType["resourceTypeIndex"] = 0; // all the import will done so will reset index
qEntityResourceType["subSinceExportType"] = "";
qEntityResourceType["subTillExportType"] = "";
logger?.LogInformation("Starting update of the chunk table.");
_azureTableMetadataStore.UpdateEntity(chunktableClient, qEntityResourceType);
logger?.LogInformation("Completed update of the chunk table.");
logger?.LogInformation("Updating logs in Application Insights.");
_telemetryClient.TrackEvent(
"ImportTill",
new Dictionary<string, string>()
{
{"Till", qEntityResourceType["globalTillExportType"].ToString() }
});
logger?.LogInformation("Logs updated successfully in Application Insights.");
}
else
{
qEntityResourceType["resourceTypeIndex"] = (int)qEntityResourceType["resourceTypeIndex"] + 1; // import done then increment counter index
logger?.LogInformation("Starting update of the chunk table.");
_azureTableMetadataStore.UpdateEntity(chunktableClient, qEntityResourceType);
logger?.LogInformation("Completed update of the chunk table.");
}
}
else
{
// check for all sub export done or not
TableEntity qEntityResourceTypenew = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey);
if (qEntityResourceTypenew["subTillExportType"].ToString() == qEntityResourceTypenew["globalTillExportType"].ToString())
{
if ((int)qEntityResourceTypenew["noOfResources"] - 1 == (int)qEntityResourceTypenew["resourceTypeIndex"])
{
//Its last run to reset value and assigning till to since
qEntityResourceTypenew["globalSinceExportType"] = qEntityResourceTypenew["globalTillExportType"];
qEntityResourceTypenew["globalTillExportType"] = "";
qEntityResourceTypenew["resourceTypeIndex"] = 0; // all the import will done so will reset index
qEntityResourceTypenew["multiExport"] = "";
qEntityResourceTypenew["subSinceExportType"] = "";
qEntityResourceTypenew["subTillExportType"] = "";
logger?.LogInformation("Starting update of the chunk table.");
_azureTableMetadataStore.UpdateEntity(chunktableClient, qEntityResourceTypenew);
logger?.LogInformation("Completed update of the chunk table.");
logger?.LogInformation("Updating logs in Application Insights.");
_telemetryClient.TrackEvent(
"ImportTill",
new Dictionary<string, string>()
{
{"Till", qEntityResourceTypenew["globalTillExportType"].ToString() }
});
logger?.LogInformation("Logs updated successfully in Application Insights.");
}
else
{
qEntityResourceTypenew["multiExport"] = ""; // if global and sub till date matches for this all export done for those chunk and increment the counter
qEntityResourceTypenew["resourceTypeIndex"] = (int)qEntityResourceTypenew["resourceTypeIndex"] + 1;
logger?.LogInformation("Starting update of the chunk table.");
_azureTableMetadataStore.UpdateEntity(chunktableClient, qEntityResourceTypenew);
logger?.LogInformation("Completed update of the chunk table.");
}
}
else
{
// multiexport run and completed sub export then assigining till to since and global till to sub till
qEntityResourceTypenew["subSinceExportType"] = qEntityResourceTypenew["subTillExportType"];
qEntityResourceTypenew["subTillExportType"] = qEntityResourceTypenew["globalTillExportType"];
logger?.LogInformation("Starting update of the chunk table.");
_azureTableMetadataStore.UpdateEntity(chunktableClient, qEntityResourceTypenew);
logger?.LogInformation("Completed update of the chunk table.");
}
}
}
}
exportEntity1["CompletedCount"] = completeCount;
logger?.LogInformation("Starting update of the export table.");
_azureTableMetadataStore.UpdateEntity(exportTableClient, exportEntity1);
logger?.LogInformation("Completed update of the export table.");
}
}
logger?.LogInformation("Updating logs in Application Insights.");
_telemetryClient.TrackEvent(
"Import",
new Dictionary<string, string>()
{
{ "ImportId", _orchestrationHelper.GetProcessId(statusUrl) },
{ "StatusUrl", statusUrl },
{ "ImportStatus", "Completed" },
{ "TotalImportResources", resourceCount },
{ "TotalExportResources", item.GetString("TotalExportResourceCount") },
{ "SourceResourceCount", azureApiForFhirTotal.Item1.HasValue ? azureApiForFhirTotal.Item1.Value.ToString() : " " },
{ "DestinationResourceCount", fhirServiceTotal.Item1.HasValue ? fhirServiceTotal.Item1.Value.ToString() : " " },
{ "SourceError", azureApiForFhirTotal.Item2 ?? " " },
{ "DestinationError", fhirServiceTotal.Item2 ?? " " },
});
logger?.LogInformation("Logs updated successfully in Application Insights.");
isComplete = true;
}
else
{
string diagnosticsValue = JObject.Parse(response.Content)?["issue"]?[0]?["diagnostics"]?.ToString() ?? "For more information check Content location.";
logger?.LogInformation($"Import Status check returned: Unsuccessful. Reason : {diagnosticsValue}");
TableEntity exportEntity = _azureTableMetadataStore.GetEntity(exportTableClient, _options.PartitionKey, item.RowKey);
exportEntity["IsImportComplete"] = true;
exportEntity["IsImportRunning"] = "Failed";
exportEntity["EndTime"] = DateTime.UtcNow;
Tuple<Uri, string> source = new Tuple<Uri, string>(_options.SourceUri, _options.SourceHttpClient);
Tuple<Uri, string> destination = new Tuple<Uri, string>(_options.DestinationUri, _options.DestinationHttpClient);
var azureApiForFhirTotal = await context.CallActivityAsync<Tuple<int?, string>>(nameof(GetTotalFromFhirAsync), source);
logger?.LogInformation("Retrieved the total number of resources from the source Azure API For FHIR.");
var fhirServiceTotal = await context.CallActivityAsync<Tuple<int?, string>>(nameof(GetTotalFromFhirAsync), destination);
logger?.LogInformation("Retrieved the total number of resources from the destination FHIR Service.");
if (azureApiForFhirTotal.Item2 != string.Empty)
{
exportEntity["SourceError"] = azureApiForFhirTotal.Item2.ToString();
}
else
{
exportEntity["SourceResourceCount"] = azureApiForFhirTotal.Item1.ToString();
}
if (fhirServiceTotal.Item2 != string.Empty)
{
exportEntity["DestinationError"] = fhirServiceTotal.Item2.ToString();
}
else
{
exportEntity["DestinationResourceCount"] = fhirServiceTotal.Item1.ToString();
}
exportEntity["FailureReason"] = diagnosticsValue;
logger?.LogInformation("Starting to update the total resource count in the export table.");
_azureTableMetadataStore.UpdateEntity(exportTableClient, exportEntity);
logger?.LogInformation("Successfully updated the total resource count in the export table.");
Pageable<TableEntity> jobListimport = exportTableClient.Query<TableEntity>(filter: ent => ent.GetBoolean("IsExportComplete") == true && ent.GetString("ImportRequest") == "Yes" && ent.GetBoolean("IsProcessed") == false && ent.GetBoolean("IsFirst") == true);
if (jobListimport.Count() == 1)
{
foreach (TableEntity jobImport in jobListimport)
{
TableEntity exportEntity1 = _azureTableMetadataStore.GetEntity(exportTableClient, _options.PartitionKey, jobImport.RowKey);
#pragma warning disable CS8629 // Nullable value type may be null.
int payloadCount = (int)jobImport.GetInt32("PayloadCount");
int completeCount = (int)jobImport.GetInt32("CompletedCount");
#pragma warning restore CS8629 // Nullable value type may be null.
completeCount++;
if (payloadCount == completeCount)
{
exportEntity1["IsProcessed"] = true;
}
exportEntity1["CompletedCount"] = completeCount;
logger?.LogInformation("Starting update of the export table.");
_azureTableMetadataStore.UpdateEntity(exportTableClient, exportEntity1);
logger?.LogInformation("Completed update of the export table.");
}
}
isComplete = true;
logger?.LogInformation("Updating logs in Application Insights.");
_telemetryClient.TrackEvent(
"Import",
new Dictionary<string, string>()
{
{ "ImportId", _orchestrationHelper.GetProcessId(statusUrl) },
{ "StatusUrl", statusUrl },
{ "ImportStatus", "Failed" },
{ "SourceResourceCount", azureApiForFhirTotal.Item1.HasValue ? azureApiForFhirTotal.Item1.Value.ToString() : " " },
{ "DestinationResourceCount", fhirServiceTotal.Item1.HasValue ? fhirServiceTotal.Item1.Value.ToString() : " " },
{ "SourceError", azureApiForFhirTotal.Item2 ?? " " },
{ "DestinationError", fhirServiceTotal.Item2 ?? " " },
{ "FailureReason", diagnosticsValue}
});
logger?.LogInformation("Logs updated successfully in Application Insights.");
throw new HttpFailureException($"StatusCode: {statusRespose.StatusCode}, Response: {statusRespose.Content.ReadAsStringAsync()} ");
}
}
isComplete = false;
// }
}
}
catch
{
throw;
}
logger?.LogInformation("Completed import status activities.");
return "completed";
}