public async override Task CleanEntityStorageAsync()

in src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs [124:209]


        public async override Task<CleanEntityStorageResult> CleanEntityStorageAsync(CleanEntityStorageRequest request = default(CleanEntityStorageRequest), CancellationToken cancellation = default(CancellationToken))
        {
            DateTime now = DateTime.UtcNow;
            string? continuationToken = request.ContinuationToken;
            int emptyEntitiesRemoved = 0;
            int orphanedLocksReleased = 0;
            var stopwatch = Stopwatch.StartNew();

            var condition = new OrchestrationInstanceStatusQueryCondition()
            {
                InstanceIdPrefix = "@",
                FetchInput = false,
                FetchOutput = false,
                ExcludeEntities = false,
            };

            await this.ensureTaskHub();

            // list all entities (without fetching the input) and for each one that requires action,
            // perform that action. Waits for all actions to finish after each page.
            do
            {
                Page<OrchestrationState>? states = await this.trackingStore.GetStateAsync(condition, cancellation).AsPages(continuationToken, 100).FirstOrDefaultAsync();
                DurableStatusQueryResult page = states != null
                    ? new DurableStatusQueryResult { ContinuationToken = states.ContinuationToken, OrchestrationState = states.Values }
                    : new DurableStatusQueryResult { OrchestrationState = Array.Empty<OrchestrationState>() };
                continuationToken = page.ContinuationToken;

                var tasks = new List<Task>();
                foreach (OrchestrationState state in page.OrchestrationState)
                {
                    EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);
                    if (status != null)
                    {
                        if (request.ReleaseOrphanedLocks && status.LockedBy != null)
                        {
                            tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
                        }

                        if (request.RemoveEmptyEntities)
                        {
                            bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.BacklogQueueSize == 0;
                            bool safeToRemoveWithoutBreakingMessageSorterLogic = 
                                (now - state.LastUpdatedTime > this.properties.EntityMessageReorderWindow);
                            if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
                            {
                                tasks.Add(DeleteIdleOrchestrationEntity(state));
                            }
                        }
                    }
                }

                async Task DeleteIdleOrchestrationEntity(OrchestrationState state)
                {
                    PurgeHistoryResult result = await this.trackingStore.PurgeInstanceHistoryAsync(state.OrchestrationInstance.InstanceId);      
                    Interlocked.Add(ref emptyEntitiesRemoved, result.InstancesDeleted);
                }

                async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner)
                {
                    OrchestrationState? ownerState
                        = await this.trackingStore.GetStateAsync(lockOwner, allExecutions: false, fetchInput: false).FirstOrDefaultAsync();

                    bool OrchestrationIsRunning(OrchestrationStatus? status)
                        => status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended);

                    if (! OrchestrationIsRunning(ownerState?.OrchestrationStatus))
                    {
                        // the owner is not a running orchestration. Send a lock release.
                        EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(state.OrchestrationInstance, lockOwner);
                        await this.sendEvent(eventToSend.AsTaskMessage());
                        Interlocked.Increment(ref orphanedLocksReleased);
                    }         
                }

                await Task.WhenAll(tasks);
            }
            while (continuationToken != null & stopwatch.Elapsed <= timeLimitForCleanEntityStorageLoop);

            return new CleanEntityStorageResult()
            {
                EmptyEntitiesRemoved = emptyEntitiesRemoved,
                OrphanedLocksReleased = orphanedLocksReleased,
                ContinuationToken = continuationToken,
            };
        }