private async Task ProcessJob()

in src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobTask.cs [314:432]


        private async Task ProcessJob()
        {
            var queryTasks = new List<Task<ReindexJobQueryStatus>>();
            var queryCancellationTokens = new Dictionary<ReindexJobQueryStatus, CancellationTokenSource>();

            // while not all queries are finished
            while (_reindexJobRecord.QueryList.Keys.Where(q =>
                q.Status == OperationStatus.Queued ||
                q.Status == OperationStatus.Running).Any())
            {
                if (_reindexJobRecord.QueryList.Keys.Where(q => q.Status == OperationStatus.Queued).Any())
                {
                    // grab the next query from the list which is labeled as queued and run it
                    var query = _reindexJobRecord.QueryList.Keys.Where(q => q.Status == OperationStatus.Queued).OrderBy(q => q.LastModified).FirstOrDefault();
                    CancellationTokenSource queryTokensSource = new CancellationTokenSource();
                    queryCancellationTokens.TryAdd(query, queryTokensSource);

                    // We don't await ProcessQuery, so query status can or can not be changed inside immediately
                    // In some cases we can go th6rough whole loop and pick same query from query list.
                    // To prevent that we marking query as running here and not inside ProcessQuery code.
                    query.Status = OperationStatus.Running;
                    query.LastModified = Clock.UtcNow;
#pragma warning disable CS4014 // Suppressed as we want to continue execution and begin processing the next query while this continues to run
                    queryTasks.Add(ProcessQueryAsync(query, queryTokensSource.Token));
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

                    _logger.LogInformation($"Reindex job task created {queryTasks.Count} Tasks");
                }

                // reset stale queries to pending
                var staleQueries = _reindexJobRecord.QueryList.Keys.Where(
                    q => q.Status == OperationStatus.Running && q.LastModified < Clock.UtcNow - _reindexJobConfiguration.JobHeartbeatTimeoutThreshold);
                foreach (var staleQuery in staleQueries)
                {
                    await _jobSemaphore.WaitAsync(_cancellationToken);
                    try
                    {
                        // if this query has a created task, cancel it
                        if (queryCancellationTokens.TryGetValue(staleQuery, out var tokenSource))
                        {
                            try
                            {
                                tokenSource.Cancel(false);
                            }
                            catch
                            {
                                // may throw exception if the task is disposed
                            }
                        }

                        staleQuery.Status = OperationStatus.Queued;
                        await UpdateJobAsync();
                    }
                    finally
                    {
                        _jobSemaphore.Release();
                    }
                }

                var averageDbConsumption = _throttleController.UpdateDatastoreUsage();
                _logger.LogInformation($"Reindex avaerage DB consumption: {averageDbConsumption}");
                var throttleDelayTime = _throttleController.GetThrottleBasedDelay();
                _logger.LogInformation($"Reindex throttle delay: {throttleDelayTime}");
                await Task.Delay(_reindexJobRecord.QueryDelayIntervalInMilliseconds + throttleDelayTime, _cancellationToken);

                // Remove all finished tasks from the collections of tasks
                // and cancellationTokens
                if (queryTasks.Count >= _reindexJobRecord.MaximumConcurrency)
                {
                    var taskArray = queryTasks.ToArray();
                    Task.WaitAny(taskArray, _cancellationToken);
                    var finishedTasks = queryTasks.Where(t => t.IsCompleted).ToArray();
                    foreach (var finishedTask in finishedTasks)
                    {
                        queryTasks.Remove(finishedTask);
                        queryCancellationTokens.Remove(await finishedTask);
                    }
                }

                // for most cases if another process updates the job (such as a DELETE request)
                // the _etag change will cause a JobConflict exception and this task will be aborted
                // but here we add one more check before attempting to mark the job as complete,
                // or starting another iteration of the loop
                await _jobSemaphore.WaitAsync();
                try
                {
                    using (IScoped<IFhirOperationDataStore> store = _fhirOperationDataStoreFactory.Invoke())
                    {
                        var wrapper = await store.Value.GetReindexJobByIdAsync(_reindexJobRecord.Id, _cancellationToken);
                        _weakETag = wrapper.ETag;
                        _reindexJobRecord.Status = wrapper.JobRecord.Status;
                    }
                }
                catch (Exception)
                {
                    // if something went wrong with fetching job status, we shouldn't fail process loop.
                }
                finally
                {
                    _jobSemaphore.Release();
                }

                // if our received CancellationToken is cancelled, or the job has been marked canceled we should
                // pass that cancellation request onto all the cancellationTokens
                // for the currently executing threads
                if (_cancellationToken.IsCancellationRequested || _reindexJobRecord.Status == OperationStatus.Canceled)
                {
                    foreach (var tokenSource in queryCancellationTokens.Values)
                    {
                        tokenSource.Cancel(false);
                    }

                    _logger.LogInformation("Reindex Job canceled.");
                    throw new OperationCanceledException("ReindexJob canceled.");
                }
            }

            Task.WaitAll(queryTasks.ToArray(), _cancellationToken);
        }