in src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorTask.cs [84:252]
public async Task<TaskResultData> ExecuteAsync()
{
var fhirRequestContext = new FhirRequestContext(
method: "Import",
uriString: _orchestratorInputData.RequestUri.ToString(),
baseUriString: _orchestratorInputData.BaseUri.ToString(),
correlationId: _orchestratorInputData.TaskId,
requestHeaders: new Dictionary<string, StringValues>(),
responseHeaders: new Dictionary<string, StringValues>())
{
IsBackgroundTask = true,
};
_contextAccessor.RequestContext = fhirRequestContext;
CancellationToken cancellationToken = _cancellationTokenSource.Token;
TaskResultData taskResultData = null;
ImportTaskErrorResult errorResult = null;
try
{
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException();
}
if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.Initialized)
{
await ValidateResourcesAsync(cancellationToken);
_orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.InputResourcesValidated;
await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);
_logger.LogInformation("Input Resources Validated");
}
if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.InputResourcesValidated)
{
await _importOrchestratorTaskDataStoreOperation.PreprocessAsync(cancellationToken);
_orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.PreprocessCompleted;
await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);
_logger.LogInformation("Preprocess Completed");
}
if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.PreprocessCompleted)
{
_orchestratorTaskContext.DataProcessingTasks = await GenerateSubTaskRecordsAsync(cancellationToken);
_orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.SubTaskRecordsGenerated;
await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);
_logger.LogInformation("SubTask Records Generated");
}
if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.SubTaskRecordsGenerated)
{
_orchestratorTaskContext.ImportResult = await ExecuteDataProcessingTasksAsync(cancellationToken);
_orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.SubTasksCompleted;
await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);
_logger.LogInformation("SubTasks Completed");
}
_orchestratorTaskContext.ImportResult.TransactionTime = _orchestratorInputData.TaskCreateTime;
}
catch (TaskCanceledException taskCanceledEx)
{
_logger.LogInformation(taskCanceledEx, "Import task canceled. {0}", taskCanceledEx.Message);
await CancelProcessingTasksAsync();
taskResultData = new TaskResultData(TaskResult.Canceled, taskCanceledEx.Message);
}
catch (OperationCanceledException canceledEx)
{
_logger.LogInformation(canceledEx, "Import task canceled. {0}", canceledEx.Message);
await CancelProcessingTasksAsync();
taskResultData = new TaskResultData(TaskResult.Canceled, canceledEx.Message);
}
catch (IntegrationDataStoreException integrationDataStoreEx)
{
_logger.LogInformation(integrationDataStoreEx, "Failed to access input files.");
errorResult = new ImportTaskErrorResult()
{
HttpStatusCode = integrationDataStoreEx.StatusCode,
ErrorMessage = integrationDataStoreEx.Message,
};
taskResultData = new TaskResultData(TaskResult.Fail, JsonConvert.SerializeObject(errorResult));
}
catch (ImportFileEtagNotMatchException eTagEx)
{
_logger.LogInformation(eTagEx, "Import file etag not match.");
errorResult = new ImportTaskErrorResult()
{
HttpStatusCode = HttpStatusCode.BadRequest,
ErrorMessage = eTagEx.Message,
};
taskResultData = new TaskResultData(TaskResult.Fail, JsonConvert.SerializeObject(errorResult));
}
catch (ImportProcessingException processingEx)
{
_logger.LogInformation(processingEx, "Failed to process input resources.");
errorResult = new ImportTaskErrorResult()
{
HttpStatusCode = HttpStatusCode.BadRequest,
ErrorMessage = processingEx.Message,
};
taskResultData = new TaskResultData(TaskResult.Fail, JsonConvert.SerializeObject(errorResult));
}
catch (Exception ex)
{
_logger.LogInformation(ex, "Failed to import data.");
errorResult = new ImportTaskErrorResult()
{
HttpStatusCode = HttpStatusCode.InternalServerError,
ErrorMessage = ex.Message,
};
await SendImportMetricsNotification(TaskResult.Fail);
throw new RetriableTaskException(JsonConvert.SerializeObject(errorResult));
}
if (_orchestratorTaskContext.Progress > ImportOrchestratorTaskProgress.InputResourcesValidated)
{
// Post-process operation cannot be cancelled.
try
{
await _importOrchestratorTaskDataStoreOperation.PostprocessAsync(CancellationToken.None);
_logger.LogInformation("Postprocess Completed");
}
catch (Exception ex)
{
_logger.LogInformation(ex, "Failed at postprocess step.");
ImportTaskErrorResult postProcessEerrorResult = new ImportTaskErrorResult()
{
HttpStatusCode = HttpStatusCode.InternalServerError,
ErrorMessage = ex.Message,
// other error if any.
InnerError = errorResult,
};
await SendImportMetricsNotification(TaskResult.Fail);
throw new RetriableTaskException(JsonConvert.SerializeObject(postProcessEerrorResult));
}
}
if (taskResultData == null) // No exception
{
taskResultData = new TaskResultData(TaskResult.Success, JsonConvert.SerializeObject(_orchestratorTaskContext.ImportResult));
}
await SendImportMetricsNotification(taskResultData.Result);
return taskResultData;
}