in edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs [289:390]
private async Task CleanQueue(bool checkEntireQueueOnCleanup)
{
long totalCleanupCount = 0;
long totalCleanupStoreCount = 0;
while (true)
{
foreach (KeyValuePair<string, ISequentialStore<MessageRef>> endpointSequentialStore in this.messageStore.endpointSequentialStores)
{
var messageQueueId = endpointSequentialStore.Key;
try
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}
var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId);
Events.CleanupTaskStarted(messageQueueId);
CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None);
ISequentialStore<MessageRef> sequentialStore = endpointSequentialStore.Value;
Events.CleanupCheckpointState(messageQueueId, checkpointData);
int cleanupEntityStoreCount = 0;
async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
if (offset > checkpointData.Offset && expiry > DateTime.UtcNow)
{
return false;
}
var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId);
await message.ForEachAsync(async msg =>
{
if (msg.RefCount == 0)
{
if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow)
{
this.expiredCounter.Increment(1, new[] { "ttl_expiry", msg.Message.GetSenderId(), msg.Message.GetOutput(), bool.TrueString });
}
await this.messageStore.messageEntityStore.Remove(messageRef.EdgeMessageId);
cleanupEntityStoreCount++;
}
});
return true;
}
// With the addition of PriorityQueues, the CleanupProcessor assumptions change slightly:
// Previously, we could always assume that if a message at the head of the queue should not be deleted,
// then none of the other messages in the queue should be either. Now, because we can have different TTL's
// for messages within the same queue, there can be messages that have expired in the queue after the head.
// The checkEntireQueueOnCleanup flag is an environment variable for edgeHub. If it is set to true, we will
// check the entire queue every time cleanup processor runs. If it is set to false, we just remove the oldest
// items in the queue until we get to one that is not expired.
int cleanupCount = 0;
if (checkEntireQueueOnCleanup)
{
IEnumerable<(long, MessageRef)> batch;
long offset = sequentialStore.GetHeadOffset(this.cancellationTokenSource.Token);
do
{
batch = await sequentialStore.GetBatch(offset, CleanupBatchSize);
foreach ((long, MessageRef) messageWithOffset in batch)
{
if (await sequentialStore.RemoveOffset(DeleteMessageCallback, messageWithOffset.Item1, this.cancellationTokenSource.Token))
{
cleanupCount++;
}
}
offset += CleanupBatchSize;
}
while (batch.Any());
}
else
{
while (await sequentialStore.RemoveFirst(DeleteMessageCallback))
{
cleanupCount++;
}
}
// Since we will have the total *true* count of messages in the store,
// we need to set the counter here to the total count of messages in the store.
Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority);
totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
}
catch (Exception ex)
{
Events.ErrorCleaningMessagesForEndpoint(ex, messageQueueId);
}
}
await Task.Delay(this.GetCleanupTaskSleepTime());
}
}