in src/DurableTask.AzureServiceFabric/Stores/ScheduledMessageProvider.cs [91:170]
async Task ProcessScheduledMessages()
{
while (!IsStopped())
{
try
{
var currentTime = DateTime.UtcNow;
var nextCheck = currentTime + TimeSpan.FromSeconds(1);
var builder = this.inMemorySet.ToBuilder();
List<Message<Guid, TaskMessageItem>> activatedMessages = new List<Message<Guid, TaskMessageItem>>();
while (builder.Count > 0)
{
var firstPendingMessage = builder.Min;
var timerEvent = firstPendingMessage.Value.TaskMessage.Event as TimerFiredEvent;
if (timerEvent == null)
{
throw new Exception("Internal Server Error : Ended up adding non TimerFiredEvent TaskMessage as scheduled message");
}
if (timerEvent.FireAt <= currentTime)
{
activatedMessages.Add(firstPendingMessage);
builder.Remove(firstPendingMessage);
}
else
{
nextCheck = timerEvent.FireAt;
break;
}
}
if (IsStopped())
{
break;
}
if (activatedMessages.Count > 0)
{
var keys = activatedMessages.Select(m => m.Key);
var values = activatedMessages.Select(m => m.Value).ToList();
IList<OrchestrationInstance> modifiedSessions = null;
await RetryHelper.ExecuteWithRetryOnTransient(async () =>
{
using (var tx = this.StateManager.CreateTransaction())
{
modifiedSessions = await this.sessionProvider.TryAppendMessageBatchAsync(tx, values);
await this.CompleteBatchAsync(tx, keys);
await tx.CommitAsync();
}
}, uniqueActionIdentifier: $"Action = '{nameof(ScheduledMessageProvider)}.{nameof(ProcessScheduledMessages)}'");
lock (@lock)
{
this.inMemorySet = this.inMemorySet.Except(activatedMessages);
}
if (modifiedSessions != null)
{
foreach (var sessionId in modifiedSessions)
{
this.sessionProvider.TryEnqueueSession(sessionId);
}
}
}
this.nextActivationCheck = nextCheck;
await WaitForItemsAsync(this.nextActivationCheck - DateTime.UtcNow);
}
catch (Exception e)
{
ServiceFabricProviderEventSource.Tracing.ExceptionWhileRunningBackgroundJob($"{nameof(ScheduledMessageProvider)}.{nameof(ProcessScheduledMessages)}", e.ToString());
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}
}