async Task ProcessScheduledMessages()

in src/DurableTask.AzureServiceFabric/Stores/ScheduledMessageProvider.cs [91:170]


        async Task ProcessScheduledMessages()
        {
            while (!IsStopped())
            {
                try
                {
                    var currentTime = DateTime.UtcNow;
                    var nextCheck = currentTime + TimeSpan.FromSeconds(1);

                    var builder = this.inMemorySet.ToBuilder();
                    List<Message<Guid, TaskMessageItem>> activatedMessages = new List<Message<Guid, TaskMessageItem>>();

                    while (builder.Count > 0)
                    {
                        var firstPendingMessage = builder.Min;
                        var timerEvent = firstPendingMessage.Value.TaskMessage.Event as TimerFiredEvent;

                        if (timerEvent == null)
                        {
                            throw new Exception("Internal Server Error : Ended up adding non TimerFiredEvent TaskMessage as scheduled message");
                        }

                        if (timerEvent.FireAt <= currentTime)
                        {
                            activatedMessages.Add(firstPendingMessage);
                            builder.Remove(firstPendingMessage);
                        }
                        else
                        {
                            nextCheck = timerEvent.FireAt;
                            break;
                        }
                    }

                    if (IsStopped())
                    {
                        break;
                    }

                    if (activatedMessages.Count > 0)
                    {
                        var keys = activatedMessages.Select(m => m.Key);
                        var values = activatedMessages.Select(m => m.Value).ToList();

                        IList<OrchestrationInstance> modifiedSessions = null;

                        await RetryHelper.ExecuteWithRetryOnTransient(async () =>
                        {
                            using (var tx = this.StateManager.CreateTransaction())
                            {
                                modifiedSessions = await this.sessionProvider.TryAppendMessageBatchAsync(tx, values);
                                await this.CompleteBatchAsync(tx, keys);
                                await tx.CommitAsync();
                            }
                        }, uniqueActionIdentifier: $"Action = '{nameof(ScheduledMessageProvider)}.{nameof(ProcessScheduledMessages)}'");

                        lock (@lock)
                        {
                            this.inMemorySet = this.inMemorySet.Except(activatedMessages);
                        }

                        if (modifiedSessions != null)
                        {
                            foreach (var sessionId in modifiedSessions)
                            {
                                this.sessionProvider.TryEnqueueSession(sessionId);
                            }
                        }
                    }

                    this.nextActivationCheck = nextCheck;
                    await WaitForItemsAsync(this.nextActivationCheck - DateTime.UtcNow);
                }
                catch (Exception e)
                {
                    ServiceFabricProviderEventSource.Tracing.ExceptionWhileRunningBackgroundJob($"{nameof(ScheduledMessageProvider)}.{nameof(ProcessScheduledMessages)}", e.ToString());
                    await Task.Delay(TimeSpan.FromMilliseconds(100));
                }
            }
        }