public async Task ExecuteAsync()

in src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportProcessingTask.cs [67:220]


        public async Task<TaskResultData> ExecuteAsync()
        {
            var fhirRequestContext = new FhirRequestContext(
                    method: "Import",
                    uriString: _inputData.UriString,
                    baseUriString: _inputData.BaseUriString,
                    correlationId: _inputData.TaskId,
                    requestHeaders: new Dictionary<string, StringValues>(),
                    responseHeaders: new Dictionary<string, StringValues>())
            {
                IsBackgroundTask = true,
            };

            _contextAccessor.RequestContext = fhirRequestContext;

            CancellationToken cancellationToken = _cancellationTokenSource.Token;

            long succeedImportCount = _importProgress.SucceedImportCount;
            long failedImportCount = _importProgress.FailedImportCount;

            ImportProcessingTaskResult result = new ImportProcessingTaskResult();
            result.ResourceType = _inputData.ResourceType;

            try
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    throw new OperationCanceledException();
                }

                Func<long, long> sequenceIdGenerator = (index) => _inputData.BeginSequenceId + index;

                // Clean resources before import start
                await _resourceBulkImporter.CleanResourceAsync(_inputData, _importProgress, cancellationToken);
                _importProgress.NeedCleanData = true;
                await _contextUpdater.UpdateContextAsync(JsonConvert.SerializeObject(_importProgress), cancellationToken);

                // Initialize error store
                IImportErrorStore importErrorStore = await _importErrorStoreFactory.InitializeAsync(GetErrorFileName(), cancellationToken);
                result.ErrorLogLocation = importErrorStore.ErrorFileLocation;

                // Load and parse resource from bulk resource
                (Channel<ImportResource> importResourceChannel, Task loadTask) = _importResourceLoader.LoadResources(_inputData.ResourceLocation, _importProgress.CurrentIndex, _inputData.ResourceType, sequenceIdGenerator, cancellationToken);

                // Import to data store
                (Channel<ImportProcessingProgress> progressChannel, Task importTask) = _resourceBulkImporter.Import(importResourceChannel, importErrorStore, cancellationToken);

                // Update progress for checkpoints
                await foreach (ImportProcessingProgress progress in progressChannel.Reader.ReadAllAsync())
                {
                    if (cancellationToken.IsCancellationRequested)
                    {
                        throw new OperationCanceledException("Import task is canceled by user.");
                    }

                    _importProgress.SucceedImportCount = progress.SucceedImportCount + succeedImportCount;
                    _importProgress.FailedImportCount = progress.FailedImportCount + failedImportCount;
                    _importProgress.CurrentIndex = progress.CurrentIndex;
                    result.SucceedCount = _importProgress.SucceedImportCount;
                    result.FailedCount = _importProgress.FailedImportCount;

                    _logger.LogInformation("Import task progress: {0}", JsonConvert.SerializeObject(_importProgress));

                    try
                    {
                        await _contextUpdater.UpdateContextAsync(JsonConvert.SerializeObject(_importProgress), cancellationToken);
                    }
                    catch (Exception ex)
                    {
                        // ignore exception for progresss update
                        _logger.LogInformation(ex, "Failed to update context.");
                    }
                }

                // Pop up exception during load & import
                // Put import task before load task for resource channel full and blocking issue.
                try
                {
                    await importTask;
                }
                catch (TaskCanceledException)
                {
                    throw;
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to import data.");
                    throw new RetriableTaskException("Failed to import data.", ex);
                }

                try
                {
                    await loadTask;
                }
                catch (TaskCanceledException)
                {
                    throw;
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to load data.");
                    throw new RetriableTaskException("Failed to load data", ex);
                }

                return new TaskResultData(TaskResult.Success, JsonConvert.SerializeObject(result));
            }
            catch (TaskCanceledException canceledEx)
            {
                _logger.LogInformation(canceledEx, "Data processing task is canceled.");

                await CleanResourceForFailureAsync(canceledEx);

                return new TaskResultData(TaskResult.Canceled, JsonConvert.SerializeObject(result));
            }
            catch (OperationCanceledException canceledEx)
            {
                _logger.LogInformation(canceledEx, "Data processing task is canceled.");

                await CleanResourceForFailureAsync(canceledEx);

                return new TaskResultData(TaskResult.Canceled, JsonConvert.SerializeObject(result));
            }
            catch (RetriableTaskException retriableEx)
            {
                _logger.LogInformation(retriableEx, "Error in data processing task.");

                await CleanResourceForFailureAsync(retriableEx);

                throw;
            }
            catch (Exception ex)
            {
                _logger.LogInformation(ex, "Critical error in data processing task.");

                await CleanResourceForFailureAsync(ex);

                throw new RetriableTaskException(ex.Message);
            }
            finally
            {
                if (!_cancellationTokenSource.IsCancellationRequested)
                {
                    _cancellationTokenSource.Cancel();
                }
            }
        }