in src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs [124:209]
public async override Task<CleanEntityStorageResult> CleanEntityStorageAsync(CleanEntityStorageRequest request = default(CleanEntityStorageRequest), CancellationToken cancellation = default(CancellationToken))
{
DateTime now = DateTime.UtcNow;
string? continuationToken = request.ContinuationToken;
int emptyEntitiesRemoved = 0;
int orphanedLocksReleased = 0;
var stopwatch = Stopwatch.StartNew();
var condition = new OrchestrationInstanceStatusQueryCondition()
{
InstanceIdPrefix = "@",
FetchInput = false,
FetchOutput = false,
ExcludeEntities = false,
};
await this.ensureTaskHub();
// list all entities (without fetching the input) and for each one that requires action,
// perform that action. Waits for all actions to finish after each page.
do
{
Page<OrchestrationState>? states = await this.trackingStore.GetStateAsync(condition, cancellation).AsPages(continuationToken, 100).FirstOrDefaultAsync();
DurableStatusQueryResult page = states != null
? new DurableStatusQueryResult { ContinuationToken = states.ContinuationToken, OrchestrationState = states.Values }
: new DurableStatusQueryResult { OrchestrationState = Array.Empty<OrchestrationState>() };
continuationToken = page.ContinuationToken;
var tasks = new List<Task>();
foreach (OrchestrationState state in page.OrchestrationState)
{
EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);
if (status != null)
{
if (request.ReleaseOrphanedLocks && status.LockedBy != null)
{
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
}
if (request.RemoveEmptyEntities)
{
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.BacklogQueueSize == 0;
bool safeToRemoveWithoutBreakingMessageSorterLogic =
(now - state.LastUpdatedTime > this.properties.EntityMessageReorderWindow);
if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
}
}
}
async Task DeleteIdleOrchestrationEntity(OrchestrationState state)
{
PurgeHistoryResult result = await this.trackingStore.PurgeInstanceHistoryAsync(state.OrchestrationInstance.InstanceId);
Interlocked.Add(ref emptyEntitiesRemoved, result.InstancesDeleted);
}
async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner)
{
OrchestrationState? ownerState
= await this.trackingStore.GetStateAsync(lockOwner, allExecutions: false, fetchInput: false).FirstOrDefaultAsync();
bool OrchestrationIsRunning(OrchestrationStatus? status)
=> status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended);
if (! OrchestrationIsRunning(ownerState?.OrchestrationStatus))
{
// the owner is not a running orchestration. Send a lock release.
EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(state.OrchestrationInstance, lockOwner);
await this.sendEvent(eventToSend.AsTaskMessage());
Interlocked.Increment(ref orphanedLocksReleased);
}
}
await Task.WhenAll(tasks);
}
while (continuationToken != null & stopwatch.Elapsed <= timeLimitForCleanEntityStorageLoop);
return new CleanEntityStorageResult()
{
EmptyEntitiesRemoved = emptyEntitiesRemoved,
OrphanedLocksReleased = orphanedLocksReleased,
ContinuationToken = continuationToken,
};
}