private async Task CleanEntityStorageLegacyAsync()

in src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs [716:788]


        private async Task<CleanEntityStorageResult> CleanEntityStorageLegacyAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
        {
            DateTime now = DateTime.UtcNow;
            CleanEntityStorageResult finalResult = default;

            var condition = new OrchestrationStatusQueryCondition()
            {
                InstanceIdPrefix = "@",
                ShowInput = false,
            };

            // list all entities (without fetching the input) and for each one that requires action,
            // perform that action. Waits for all actions to finish after each page.
            do
            {
                var page = await this.DurabilityProvider.GetOrchestrationStateWithPagination(condition, cancellationToken);

                List<Task> tasks = new List<Task>();
                foreach (var state in page.DurableOrchestrationState)
                {
                    EntityStatus status = this.messageDataConverter.Deserialize<EntityStatus>(state.CustomStatus.ToString());
                    if (releaseOrphanedLocks && status.LockedBy != null)
                    {
                         tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
                    }

                    if (removeEmptyEntities)
                    {
                        bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0;
                        bool safeToRemoveWithoutBreakingMessageSorterLogic = this.durabilityProvider.GuaranteesOrderedDelivery ?
                            true : (now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.durableTaskOptions.EntityMessageReorderWindowInMinutes));
                        if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
                        {
                            tasks.Add(DeleteIdleOrchestrationEntity(state));
                        }
                    }
                }

                async Task DeleteIdleOrchestrationEntity(DurableOrchestrationStatus status)
                {
                    await this.DurabilityProvider.PurgeInstanceHistoryByInstanceId(status.InstanceId);
                    Interlocked.Increment(ref finalResult.NumberOfEmptyEntitiesRemoved);
                }

                async Task CheckForOrphanedLockAndFixIt(DurableOrchestrationStatus status, string lockOwner)
                {
                    var findRunningOwner = new OrchestrationStatusQueryCondition()
                    {
                        InstanceIdPrefix = lockOwner,
                        ShowInput = false,
                        RuntimeStatus = RunningStatus,
                    };
                    var result = await this.DurabilityProvider.GetOrchestrationStateWithPagination(findRunningOwner, cancellationToken);
                    if (!result.DurableOrchestrationState.Any(state => state.InstanceId == lockOwner))
                    {
                        // the owner is not a running orchestration. Send a lock release.
                        var message = new ReleaseMessage()
                        {
                            ParentInstanceId = lockOwner,
                            LockRequestId = "fix-orphaned-lock", // we don't know the original id but it does not matter
                        };
                        await this.RaiseEventInternalAsync(this.client, this.TaskHubName, status.InstanceId, EntityMessageEventNames.ReleaseMessageEventName, message, checkStatusFirst: false);
                        Interlocked.Increment(ref finalResult.NumberOfOrphanedLocksRemoved);
                    }
                }

                await Task.WhenAll(tasks);
                condition.ContinuationToken = page.ContinuationToken;
            }
            while (condition.ContinuationToken != null);

            return finalResult;
        }