in src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs [716:788]
private async Task<CleanEntityStorageResult> CleanEntityStorageLegacyAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
{
DateTime now = DateTime.UtcNow;
CleanEntityStorageResult finalResult = default;
var condition = new OrchestrationStatusQueryCondition()
{
InstanceIdPrefix = "@",
ShowInput = false,
};
// list all entities (without fetching the input) and for each one that requires action,
// perform that action. Waits for all actions to finish after each page.
do
{
var page = await this.DurabilityProvider.GetOrchestrationStateWithPagination(condition, cancellationToken);
List<Task> tasks = new List<Task>();
foreach (var state in page.DurableOrchestrationState)
{
EntityStatus status = this.messageDataConverter.Deserialize<EntityStatus>(state.CustomStatus.ToString());
if (releaseOrphanedLocks && status.LockedBy != null)
{
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
}
if (removeEmptyEntities)
{
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0;
bool safeToRemoveWithoutBreakingMessageSorterLogic = this.durabilityProvider.GuaranteesOrderedDelivery ?
true : (now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.durableTaskOptions.EntityMessageReorderWindowInMinutes));
if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
}
}
async Task DeleteIdleOrchestrationEntity(DurableOrchestrationStatus status)
{
await this.DurabilityProvider.PurgeInstanceHistoryByInstanceId(status.InstanceId);
Interlocked.Increment(ref finalResult.NumberOfEmptyEntitiesRemoved);
}
async Task CheckForOrphanedLockAndFixIt(DurableOrchestrationStatus status, string lockOwner)
{
var findRunningOwner = new OrchestrationStatusQueryCondition()
{
InstanceIdPrefix = lockOwner,
ShowInput = false,
RuntimeStatus = RunningStatus,
};
var result = await this.DurabilityProvider.GetOrchestrationStateWithPagination(findRunningOwner, cancellationToken);
if (!result.DurableOrchestrationState.Any(state => state.InstanceId == lockOwner))
{
// the owner is not a running orchestration. Send a lock release.
var message = new ReleaseMessage()
{
ParentInstanceId = lockOwner,
LockRequestId = "fix-orphaned-lock", // we don't know the original id but it does not matter
};
await this.RaiseEventInternalAsync(this.client, this.TaskHubName, status.InstanceId, EntityMessageEventNames.ReleaseMessageEventName, message, checkStatusFirst: false);
Interlocked.Increment(ref finalResult.NumberOfOrphanedLocksRemoved);
}
}
await Task.WhenAll(tasks);
condition.ContinuationToken = page.ContinuationToken;
}
while (condition.ContinuationToken != null);
return finalResult;
}