in src/Microsoft.Health.Fhir.Core/Features/Operations/Export/ExportJobTask.cs [382:476]
private async Task SearchWithFilter(
ExportJobConfiguration exportJobConfiguration,
ExportJobProgress progress,
string resourceType,
List<Tuple<string, string>> queryParametersList,
List<Tuple<string, string>> sharedQueryParametersList,
IAnonymizer anonymizer,
string batchIdPrefix,
CancellationToken cancellationToken)
{
// Current batch will be used to organize a set of search results into a group so that they can be committed together.
string currentBatchId = batchIdPrefix + progress.Page.ToString("d6");
// Process the export if:
// 1. There is continuation token, which means there is more resource to be exported.
// 2. There is no continuation token but the page is 0, which means it's the initial export.
while (progress.ContinuationToken != null || progress.Page == 0)
{
SearchResult searchResult = null;
// Search and process the results.
switch (_exportJobRecord.ExportType)
{
case ExportJobType.All:
case ExportJobType.Patient:
using (IScoped<ISearchService> searchService = _searchServiceFactory())
{
searchResult = await searchService.Value.SearchAsync(
resourceType: resourceType,
queryParametersList,
cancellationToken);
}
break;
case ExportJobType.Group:
searchResult = await GetGroupPatients(
_exportJobRecord.GroupId,
queryParametersList,
_exportJobRecord.QueuedTime,
cancellationToken);
break;
}
if (_exportJobRecord.ExportType == ExportJobType.Patient || _exportJobRecord.ExportType == ExportJobType.Group)
{
uint resultIndex = 0;
foreach (SearchResultEntry result in searchResult.Results)
{
// If a job is resumed in the middle of processing patient compartment resources it will skip patients it has already exported compartment information for.
// This assumes the order of the search results is the same every time the same search is performed.
if (progress.SubSearch != null && result.Resource.ResourceId != progress.SubSearch.TriggeringResourceId)
{
resultIndex++;
continue;
}
if (progress.SubSearch == null)
{
progress.NewSubSearch(result.Resource.ResourceId);
}
await RunExportCompartmentSearch(exportJobConfiguration, progress.SubSearch, sharedQueryParametersList, anonymizer, cancellationToken, currentBatchId + ":" + resultIndex.ToString("d6"));
resultIndex++;
progress.ClearSubSearch();
}
}
// Skips processing top level search results if the job only requested resources from the compartments of patients, but didn't want the patients.
if (_exportJobRecord.ExportType == ExportJobType.All
|| string.IsNullOrWhiteSpace(_exportJobRecord.ResourceType)
|| _exportJobRecord.ResourceType.Contains(KnownResourceTypes.Patient, StringComparison.OrdinalIgnoreCase))
{
await ProcessSearchResultsAsync(searchResult.Results, currentBatchId, anonymizer, cancellationToken);
}
if (searchResult.ContinuationToken == null)
{
// No more continuation token, we are done.
break;
}
await ProcessProgressChange(
exportJobConfiguration,
progress,
queryParametersList,
searchResult.ContinuationToken,
forceCommit: _exportJobRecord.ExportType == ExportJobType.Patient || _exportJobRecord.ExportType == ExportJobType.Group,
cancellationToken);
currentBatchId = batchIdPrefix + progress.Page.ToString("d6");
}
// Commit one last time for any pending changes.
await _exportDestinationClient.CommitAsync(exportJobConfiguration, cancellationToken);
}