private async Task CleanQueue()

in edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs [289:390]


            private async Task CleanQueue(bool checkEntireQueueOnCleanup)
            {
                long totalCleanupCount = 0;
                long totalCleanupStoreCount = 0;
                while (true)
                {
                    foreach (KeyValuePair<string, ISequentialStore<MessageRef>> endpointSequentialStore in this.messageStore.endpointSequentialStores)
                    {
                        var messageQueueId = endpointSequentialStore.Key;
                        try
                        {
                            if (this.cancellationTokenSource.IsCancellationRequested)
                            {
                                return;
                            }

                            var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId);
                            Events.CleanupTaskStarted(messageQueueId);
                            CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None);
                            ISequentialStore<MessageRef> sequentialStore = endpointSequentialStore.Value;
                            Events.CleanupCheckpointState(messageQueueId, checkpointData);
                            int cleanupEntityStoreCount = 0;

                            async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
                            {
                                var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
                                if (offset > checkpointData.Offset && expiry > DateTime.UtcNow)
                                {
                                    return false;
                                }

                                var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId);

                                await message.ForEachAsync(async msg =>
                                {
                                    if (msg.RefCount == 0)
                                    {
                                        if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow)
                                        {
                                            this.expiredCounter.Increment(1, new[] { "ttl_expiry", msg.Message.GetSenderId(), msg.Message.GetOutput(), bool.TrueString });
                                        }

                                        await this.messageStore.messageEntityStore.Remove(messageRef.EdgeMessageId);
                                        cleanupEntityStoreCount++;
                                    }
                                });

                                return true;
                            }

                            // With the addition of PriorityQueues, the CleanupProcessor assumptions change slightly:
                            // Previously, we could always assume that if a message at the head of the queue should not be deleted,
                            // then none of the other messages in the queue should be either. Now, because we can have different TTL's
                            // for messages within the same queue, there can be messages that have expired in the queue after the head.
                            // The checkEntireQueueOnCleanup flag is an environment variable for edgeHub. If it is set to true, we will
                            // check the entire queue every time cleanup processor runs. If it is set to false, we just remove the oldest
                            // items in the queue until we get to one that is not expired.
                            int cleanupCount = 0;
                            if (checkEntireQueueOnCleanup)
                            {
                                IEnumerable<(long, MessageRef)> batch;
                                long offset = sequentialStore.GetHeadOffset(this.cancellationTokenSource.Token);
                                do
                                {
                                    batch = await sequentialStore.GetBatch(offset, CleanupBatchSize);
                                    foreach ((long, MessageRef) messageWithOffset in batch)
                                    {
                                        if (await sequentialStore.RemoveOffset(DeleteMessageCallback, messageWithOffset.Item1, this.cancellationTokenSource.Token))
                                        {
                                            cleanupCount++;
                                        }
                                    }

                                    offset += CleanupBatchSize;
                                }
                                while (batch.Any());
                            }
                            else
                            {
                                while (await sequentialStore.RemoveFirst(DeleteMessageCallback))
                                {
                                    cleanupCount++;
                                }
                            }

                            // Since we will have the total *true* count of messages in the store,
                            // we need to set the counter here to the total count of messages in the store.
                            Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority);

                            totalCleanupCount += cleanupCount;
                            totalCleanupStoreCount += cleanupEntityStoreCount;
                            Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
                        }
                        catch (Exception ex)
                        {
                            Events.ErrorCleaningMessagesForEndpoint(ex, messageQueueId);
                        }
                    }

                    await Task.Delay(this.GetCleanupTaskSleepTime());
                }
            }