in src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobTask.cs [314:432]
private async Task ProcessJob()
{
var queryTasks = new List<Task<ReindexJobQueryStatus>>();
var queryCancellationTokens = new Dictionary<ReindexJobQueryStatus, CancellationTokenSource>();
// while not all queries are finished
while (_reindexJobRecord.QueryList.Keys.Where(q =>
q.Status == OperationStatus.Queued ||
q.Status == OperationStatus.Running).Any())
{
if (_reindexJobRecord.QueryList.Keys.Where(q => q.Status == OperationStatus.Queued).Any())
{
// grab the next query from the list which is labeled as queued and run it
var query = _reindexJobRecord.QueryList.Keys.Where(q => q.Status == OperationStatus.Queued).OrderBy(q => q.LastModified).FirstOrDefault();
CancellationTokenSource queryTokensSource = new CancellationTokenSource();
queryCancellationTokens.TryAdd(query, queryTokensSource);
// We don't await ProcessQuery, so query status can or can not be changed inside immediately
// In some cases we can go th6rough whole loop and pick same query from query list.
// To prevent that we marking query as running here and not inside ProcessQuery code.
query.Status = OperationStatus.Running;
query.LastModified = Clock.UtcNow;
#pragma warning disable CS4014 // Suppressed as we want to continue execution and begin processing the next query while this continues to run
queryTasks.Add(ProcessQueryAsync(query, queryTokensSource.Token));
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_logger.LogInformation($"Reindex job task created {queryTasks.Count} Tasks");
}
// reset stale queries to pending
var staleQueries = _reindexJobRecord.QueryList.Keys.Where(
q => q.Status == OperationStatus.Running && q.LastModified < Clock.UtcNow - _reindexJobConfiguration.JobHeartbeatTimeoutThreshold);
foreach (var staleQuery in staleQueries)
{
await _jobSemaphore.WaitAsync(_cancellationToken);
try
{
// if this query has a created task, cancel it
if (queryCancellationTokens.TryGetValue(staleQuery, out var tokenSource))
{
try
{
tokenSource.Cancel(false);
}
catch
{
// may throw exception if the task is disposed
}
}
staleQuery.Status = OperationStatus.Queued;
await UpdateJobAsync();
}
finally
{
_jobSemaphore.Release();
}
}
var averageDbConsumption = _throttleController.UpdateDatastoreUsage();
_logger.LogInformation($"Reindex avaerage DB consumption: {averageDbConsumption}");
var throttleDelayTime = _throttleController.GetThrottleBasedDelay();
_logger.LogInformation($"Reindex throttle delay: {throttleDelayTime}");
await Task.Delay(_reindexJobRecord.QueryDelayIntervalInMilliseconds + throttleDelayTime, _cancellationToken);
// Remove all finished tasks from the collections of tasks
// and cancellationTokens
if (queryTasks.Count >= _reindexJobRecord.MaximumConcurrency)
{
var taskArray = queryTasks.ToArray();
Task.WaitAny(taskArray, _cancellationToken);
var finishedTasks = queryTasks.Where(t => t.IsCompleted).ToArray();
foreach (var finishedTask in finishedTasks)
{
queryTasks.Remove(finishedTask);
queryCancellationTokens.Remove(await finishedTask);
}
}
// for most cases if another process updates the job (such as a DELETE request)
// the _etag change will cause a JobConflict exception and this task will be aborted
// but here we add one more check before attempting to mark the job as complete,
// or starting another iteration of the loop
await _jobSemaphore.WaitAsync();
try
{
using (IScoped<IFhirOperationDataStore> store = _fhirOperationDataStoreFactory.Invoke())
{
var wrapper = await store.Value.GetReindexJobByIdAsync(_reindexJobRecord.Id, _cancellationToken);
_weakETag = wrapper.ETag;
_reindexJobRecord.Status = wrapper.JobRecord.Status;
}
}
catch (Exception)
{
// if something went wrong with fetching job status, we shouldn't fail process loop.
}
finally
{
_jobSemaphore.Release();
}
// if our received CancellationToken is cancelled, or the job has been marked canceled we should
// pass that cancellation request onto all the cancellationTokens
// for the currently executing threads
if (_cancellationToken.IsCancellationRequested || _reindexJobRecord.Status == OperationStatus.Canceled)
{
foreach (var tokenSource in queryCancellationTokens.Values)
{
tokenSource.Cancel(false);
}
_logger.LogInformation("Reindex Job canceled.");
throw new OperationCanceledException("ReindexJob canceled.");
}
}
Task.WaitAll(queryTasks.ToArray(), _cancellationToken);
}