in src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs [91:240]
public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETag, CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(exportJobRecord, nameof(exportJobRecord));
_exportJobRecord = exportJobRecord;
_weakETag = weakETag;
_fileManager = new ExportFileManager(_exportJobRecord, _exportDestinationClient);
var existingFhirRequestContext = _contextAccessor.RequestContext;
try
{
ExportJobConfiguration exportJobConfiguration = _exportJobConfiguration;
// Add a request context so that bundle issues can be added by the SearchOptionFactory
var fhirRequestContext = new FhirRequestContext(
method: "Export",
uriString: "$export",
baseUriString: "$export",
correlationId: _exportJobRecord.Id,
requestHeaders: new Dictionary<string, StringValues>(),
responseHeaders: new Dictionary<string, StringValues>())
{
IsBackgroundTask = true,
};
_contextAccessor.RequestContext = fhirRequestContext;
string connectionHash = string.IsNullOrEmpty(_exportJobConfiguration.StorageAccountConnection) ?
string.Empty :
Health.Core.Extensions.StringExtensions.ComputeHash(_exportJobConfiguration.StorageAccountConnection);
if (string.IsNullOrEmpty(exportJobRecord.StorageAccountUri))
{
if (!string.Equals(exportJobRecord.StorageAccountConnectionHash, connectionHash, StringComparison.Ordinal))
{
throw new DestinationConnectionException("Storage account connection string was updated during an export job.", HttpStatusCode.BadRequest);
}
}
else
{
exportJobConfiguration = new ExportJobConfiguration();
exportJobConfiguration.Enabled = _exportJobConfiguration.Enabled;
exportJobConfiguration.StorageAccountUri = exportJobRecord.StorageAccountUri;
}
if (_exportJobRecord.Filters != null &&
_exportJobRecord.Filters.Count > 0 &&
string.IsNullOrEmpty(_exportJobRecord.ResourceType))
{
throw new BadRequestException(Resources.TypeFilterWithoutTypeIsUnsupported);
}
// Connect to export destination using appropriate client.
await _exportDestinationClient.ConnectAsync(exportJobConfiguration, cancellationToken, _exportJobRecord.StorageAccountContainerName);
// If we are resuming a job, we can detect that by checking the progress info from the job record.
// If it is null, then we know we are processing a new job.
if (_exportJobRecord.Progress == null)
{
_exportJobRecord.Progress = new ExportJobProgress(continuationToken: null, page: 0);
}
// The intial list of query parameters will not have a continutation token. We will add that later if we get one back
// from the search result.
var queryParametersList = new List<Tuple<string, string>>()
{
Tuple.Create(KnownQueryParameterNames.Count, _exportJobRecord.MaximumNumberOfResourcesPerQuery.ToString(CultureInfo.InvariantCulture)),
Tuple.Create(KnownQueryParameterNames.LastUpdated, $"le{_exportJobRecord.QueuedTime.ToString("o", CultureInfo.InvariantCulture)}"),
};
if (_exportJobRecord.Since != null)
{
queryParametersList.Add(Tuple.Create(KnownQueryParameterNames.LastUpdated, $"ge{_exportJobRecord.Since}"));
}
ExportJobProgress progress = _exportJobRecord.Progress;
await RunExportSearch(exportJobConfiguration, progress, queryParametersList, cancellationToken);
await CompleteJobAsync(OperationStatus.Completed, cancellationToken);
_logger.LogTrace("Successfully completed the job.");
}
catch (JobConflictException)
{
// The export job was updated externally. There might be some additional resources that were exported
// but we will not be updating the job record.
_logger.LogTrace("The job was updated by another process.");
}
catch (RequestRateExceededException)
{
_logger.LogTrace("Job failed due to RequestRateExceeded.");
}
catch (DestinationConnectionException dce)
{
_logger.LogError(dce, "Can't connect to destination. The job will be marked as failed.");
_exportJobRecord.FailureDetails = new JobFailureDetails(dce.Message, dce.StatusCode);
await CompleteJobAsync(OperationStatus.Failed, cancellationToken);
}
catch (ResourceNotFoundException rnfe)
{
_logger.LogError(rnfe, "Can't find specified resource. The job will be marked as failed.");
_exportJobRecord.FailureDetails = new JobFailureDetails(rnfe.Message, HttpStatusCode.BadRequest);
await CompleteJobAsync(OperationStatus.Failed, cancellationToken);
}
catch (FailedToParseAnonymizationConfigurationException ex)
{
_logger.LogError(ex, "Failed to parse anonymization configuration. The job will be marked as failed.");
_exportJobRecord.FailureDetails = new JobFailureDetails(ex.Message, HttpStatusCode.BadRequest);
await CompleteJobAsync(OperationStatus.Failed, cancellationToken);
}
catch (FailedToAnonymizeResourceException ex)
{
_logger.LogError(ex, "Failed to anonymize resource. The job will be marked as failed.");
_exportJobRecord.FailureDetails = new JobFailureDetails(string.Format(Resources.FailedToAnonymizeResource, ex.Message), HttpStatusCode.BadRequest);
await CompleteJobAsync(OperationStatus.Failed, cancellationToken);
}
catch (AnonymizationConfigurationNotFoundException ex)
{
_logger.LogError(ex, "Cannot found anonymization configuration. The job will be marked as failed.");
_exportJobRecord.FailureDetails = new JobFailureDetails(ex.Message, HttpStatusCode.BadRequest);
await CompleteJobAsync(OperationStatus.Failed, cancellationToken);
}
catch (AnonymizationConfigurationFetchException ex)
{
_logger.LogError(ex, "Failed to fetch anonymization configuration file. The job will be marked as failed.");
_exportJobRecord.FailureDetails = new JobFailureDetails(ex.Message, HttpStatusCode.BadRequest);
await CompleteJobAsync(OperationStatus.Failed, cancellationToken);
}
catch (Exception ex)
{
// The job has encountered an error it cannot recover from.
// Try to update the job to failed state.
_logger.LogError(ex, "Encountered an unhandled exception. The job will be marked as failed.");
_exportJobRecord.FailureDetails = new JobFailureDetails(Resources.UnknownError, HttpStatusCode.InternalServerError);
await CompleteJobAsync(OperationStatus.Failed, cancellationToken);
}
finally
{
_contextAccessor.RequestContext = existingFhirRequestContext;
}
}