public async Task ExecuteAsync()

in src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportOrchestratorTask.cs [84:252]


        public async Task<TaskResultData> ExecuteAsync()
        {
            var fhirRequestContext = new FhirRequestContext(
                    method: "Import",
                    uriString: _orchestratorInputData.RequestUri.ToString(),
                    baseUriString: _orchestratorInputData.BaseUri.ToString(),
                    correlationId: _orchestratorInputData.TaskId,
                    requestHeaders: new Dictionary<string, StringValues>(),
                    responseHeaders: new Dictionary<string, StringValues>())
            {
                IsBackgroundTask = true,
            };

            _contextAccessor.RequestContext = fhirRequestContext;

            CancellationToken cancellationToken = _cancellationTokenSource.Token;

            TaskResultData taskResultData = null;
            ImportTaskErrorResult errorResult = null;
            try
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    throw new OperationCanceledException();
                }

                if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.Initialized)
                {
                    await ValidateResourcesAsync(cancellationToken);

                    _orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.InputResourcesValidated;
                    await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);

                    _logger.LogInformation("Input Resources Validated");
                }

                if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.InputResourcesValidated)
                {
                    await _importOrchestratorTaskDataStoreOperation.PreprocessAsync(cancellationToken);

                    _orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.PreprocessCompleted;
                    await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);

                    _logger.LogInformation("Preprocess Completed");
                }

                if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.PreprocessCompleted)
                {
                    _orchestratorTaskContext.DataProcessingTasks = await GenerateSubTaskRecordsAsync(cancellationToken);
                    _orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.SubTaskRecordsGenerated;
                    await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);

                    _logger.LogInformation("SubTask Records Generated");
                }

                if (_orchestratorTaskContext.Progress == ImportOrchestratorTaskProgress.SubTaskRecordsGenerated)
                {
                    _orchestratorTaskContext.ImportResult = await ExecuteDataProcessingTasksAsync(cancellationToken);

                    _orchestratorTaskContext.Progress = ImportOrchestratorTaskProgress.SubTasksCompleted;
                    await UpdateProgressAsync(_orchestratorTaskContext, cancellationToken);

                    _logger.LogInformation("SubTasks Completed");
                }

                _orchestratorTaskContext.ImportResult.TransactionTime = _orchestratorInputData.TaskCreateTime;
            }
            catch (TaskCanceledException taskCanceledEx)
            {
                _logger.LogInformation(taskCanceledEx, "Import task canceled. {0}", taskCanceledEx.Message);

                await CancelProcessingTasksAsync();
                taskResultData = new TaskResultData(TaskResult.Canceled, taskCanceledEx.Message);
            }
            catch (OperationCanceledException canceledEx)
            {
                _logger.LogInformation(canceledEx, "Import task canceled. {0}", canceledEx.Message);

                await CancelProcessingTasksAsync();
                taskResultData = new TaskResultData(TaskResult.Canceled, canceledEx.Message);
            }
            catch (IntegrationDataStoreException integrationDataStoreEx)
            {
                _logger.LogInformation(integrationDataStoreEx, "Failed to access input files.");

                errorResult = new ImportTaskErrorResult()
                {
                    HttpStatusCode = integrationDataStoreEx.StatusCode,
                    ErrorMessage = integrationDataStoreEx.Message,
                };

                taskResultData = new TaskResultData(TaskResult.Fail, JsonConvert.SerializeObject(errorResult));
            }
            catch (ImportFileEtagNotMatchException eTagEx)
            {
                _logger.LogInformation(eTagEx, "Import file etag not match.");

                errorResult = new ImportTaskErrorResult()
                {
                    HttpStatusCode = HttpStatusCode.BadRequest,
                    ErrorMessage = eTagEx.Message,
                };

                taskResultData = new TaskResultData(TaskResult.Fail, JsonConvert.SerializeObject(errorResult));
            }
            catch (ImportProcessingException processingEx)
            {
                _logger.LogInformation(processingEx, "Failed to process input resources.");

                errorResult = new ImportTaskErrorResult()
                {
                    HttpStatusCode = HttpStatusCode.BadRequest,
                    ErrorMessage = processingEx.Message,
                };

                taskResultData = new TaskResultData(TaskResult.Fail, JsonConvert.SerializeObject(errorResult));
            }
            catch (Exception ex)
            {
                _logger.LogInformation(ex, "Failed to import data.");

                errorResult = new ImportTaskErrorResult()
                {
                    HttpStatusCode = HttpStatusCode.InternalServerError,
                    ErrorMessage = ex.Message,
                };

                await SendImportMetricsNotification(TaskResult.Fail);

                throw new RetriableTaskException(JsonConvert.SerializeObject(errorResult));
            }

            if (_orchestratorTaskContext.Progress > ImportOrchestratorTaskProgress.InputResourcesValidated)
            {
                // Post-process operation cannot be cancelled.
                try
                {
                    await _importOrchestratorTaskDataStoreOperation.PostprocessAsync(CancellationToken.None);

                    _logger.LogInformation("Postprocess Completed");
                }
                catch (Exception ex)
                {
                    _logger.LogInformation(ex, "Failed at postprocess step.");

                    ImportTaskErrorResult postProcessEerrorResult = new ImportTaskErrorResult()
                    {
                        HttpStatusCode = HttpStatusCode.InternalServerError,
                        ErrorMessage = ex.Message,

                        // other error if any.
                        InnerError = errorResult,
                    };

                    await SendImportMetricsNotification(TaskResult.Fail);

                    throw new RetriableTaskException(JsonConvert.SerializeObject(postProcessEerrorResult));
                }
            }

            if (taskResultData == null) // No exception
            {
                taskResultData = new TaskResultData(TaskResult.Success, JsonConvert.SerializeObject(_orchestratorTaskContext.ImportResult));
            }

            await SendImportMetricsNotification(taskResultData.Result);

            return taskResultData;
        }