in src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportProcessingTask.cs [67:220]
public async Task<TaskResultData> ExecuteAsync()
{
var fhirRequestContext = new FhirRequestContext(
method: "Import",
uriString: _inputData.UriString,
baseUriString: _inputData.BaseUriString,
correlationId: _inputData.TaskId,
requestHeaders: new Dictionary<string, StringValues>(),
responseHeaders: new Dictionary<string, StringValues>())
{
IsBackgroundTask = true,
};
_contextAccessor.RequestContext = fhirRequestContext;
CancellationToken cancellationToken = _cancellationTokenSource.Token;
long succeedImportCount = _importProgress.SucceedImportCount;
long failedImportCount = _importProgress.FailedImportCount;
ImportProcessingTaskResult result = new ImportProcessingTaskResult();
result.ResourceType = _inputData.ResourceType;
try
{
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException();
}
Func<long, long> sequenceIdGenerator = (index) => _inputData.BeginSequenceId + index;
// Clean resources before import start
await _resourceBulkImporter.CleanResourceAsync(_inputData, _importProgress, cancellationToken);
_importProgress.NeedCleanData = true;
await _contextUpdater.UpdateContextAsync(JsonConvert.SerializeObject(_importProgress), cancellationToken);
// Initialize error store
IImportErrorStore importErrorStore = await _importErrorStoreFactory.InitializeAsync(GetErrorFileName(), cancellationToken);
result.ErrorLogLocation = importErrorStore.ErrorFileLocation;
// Load and parse resource from bulk resource
(Channel<ImportResource> importResourceChannel, Task loadTask) = _importResourceLoader.LoadResources(_inputData.ResourceLocation, _importProgress.CurrentIndex, _inputData.ResourceType, sequenceIdGenerator, cancellationToken);
// Import to data store
(Channel<ImportProcessingProgress> progressChannel, Task importTask) = _resourceBulkImporter.Import(importResourceChannel, importErrorStore, cancellationToken);
// Update progress for checkpoints
await foreach (ImportProcessingProgress progress in progressChannel.Reader.ReadAllAsync())
{
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException("Import task is canceled by user.");
}
_importProgress.SucceedImportCount = progress.SucceedImportCount + succeedImportCount;
_importProgress.FailedImportCount = progress.FailedImportCount + failedImportCount;
_importProgress.CurrentIndex = progress.CurrentIndex;
result.SucceedCount = _importProgress.SucceedImportCount;
result.FailedCount = _importProgress.FailedImportCount;
_logger.LogInformation("Import task progress: {0}", JsonConvert.SerializeObject(_importProgress));
try
{
await _contextUpdater.UpdateContextAsync(JsonConvert.SerializeObject(_importProgress), cancellationToken);
}
catch (Exception ex)
{
// ignore exception for progresss update
_logger.LogInformation(ex, "Failed to update context.");
}
}
// Pop up exception during load & import
// Put import task before load task for resource channel full and blocking issue.
try
{
await importTask;
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to import data.");
throw new RetriableTaskException("Failed to import data.", ex);
}
try
{
await loadTask;
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to load data.");
throw new RetriableTaskException("Failed to load data", ex);
}
return new TaskResultData(TaskResult.Success, JsonConvert.SerializeObject(result));
}
catch (TaskCanceledException canceledEx)
{
_logger.LogInformation(canceledEx, "Data processing task is canceled.");
await CleanResourceForFailureAsync(canceledEx);
return new TaskResultData(TaskResult.Canceled, JsonConvert.SerializeObject(result));
}
catch (OperationCanceledException canceledEx)
{
_logger.LogInformation(canceledEx, "Data processing task is canceled.");
await CleanResourceForFailureAsync(canceledEx);
return new TaskResultData(TaskResult.Canceled, JsonConvert.SerializeObject(result));
}
catch (RetriableTaskException retriableEx)
{
_logger.LogInformation(retriableEx, "Error in data processing task.");
await CleanResourceForFailureAsync(retriableEx);
throw;
}
catch (Exception ex)
{
_logger.LogInformation(ex, "Critical error in data processing task.");
await CleanResourceForFailureAsync(ex);
throw new RetriableTaskException(ex.Message);
}
finally
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}
}
}