in src/Microsoft.Health.Dicom.Operations/Indexing/ReindexDurableFunction.Orchestration.cs [34:103]
public async Task ReindexInstancesAsync(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
EnsureArg.IsNotNull(context, nameof(context));
logger = context.CreateReplaySafeLogger(logger);
ReindexInput input = context.GetInput<ReindexInput>();
// The ID should be a GUID as generated by the trigger, but we'll assert here just to make sure!
if (!context.HasInstanceGuid())
{
return;
}
// Fetch the set of query tags that require re-indexing
IReadOnlyList<ExtendedQueryTagStoreEntry> queryTags = await GetOperationQueryTagsAsync(context, input);
logger.LogInformation(
"Found {Count} extended query tag paths to re-index {{{TagPaths}}}.",
queryTags.Count,
string.Join(", ", queryTags.Select(x => x.Path)));
List<int> queryTagKeys = queryTags.Select(x => x.Key).ToList();
if (queryTags.Count > 0)
{
IReadOnlyList<WatermarkRange> batches = await context.CallActivityWithRetryAsync<IReadOnlyList<WatermarkRange>>(
nameof(GetInstanceBatchesV2Async),
_options.ActivityRetryOptions,
BatchCreationArguments.FromOptions(input.Completed?.Start - 1, _options));
if (batches.Count > 0)
{
// Note that batches are in reverse order because we start from the highest watermark
var batchRange = new WatermarkRange(batches[^1].Start, batches[0].End);
logger.LogInformation("Beginning to re-index the range {Range}.", batchRange);
await Task.WhenAll(batches
.Select(x => context.CallActivityWithRetryAsync(
nameof(ReindexBatchV2Async),
_options.ActivityRetryOptions,
ReindexBatchArguments.FromOptions(queryTags, x, _options))));
// Create a new orchestration with the same instance ID to process the remaining data
logger.LogInformation("Completed re-indexing the range {Range}. Continuing with new execution...", batchRange);
WatermarkRange completed = input.Completed.HasValue
? new WatermarkRange(batchRange.Start, input.Completed.Value.End)
: batchRange;
context.ContinueAsNew(new ReindexInput { QueryTagKeys = queryTagKeys, Completed = completed });
}
else
{
IReadOnlyList<int> completed = await context.CallActivityWithRetryAsync<IReadOnlyList<int>>(
nameof(CompleteReindexingAsync),
_options.ActivityRetryOptions,
queryTagKeys);
logger.LogInformation(
"Completed re-indexing for the following extended query tags {{{QueryTagKeys}}}.",
string.Join(", ", completed));
}
}
else
{
logger.LogWarning(
"Could not find any query tags for the re-indexing operation '{OperationId}'.",
context.InstanceId);
}
}