in src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs [269:372]
public override async IAsyncEnumerable<string> RewindHistoryAsync(string instanceId, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// REWIND ALGORITHM:
// 1. Finds failed execution of specified orchestration instance to rewind
// 2. Finds failure entities to clear and over-writes them (as well as corresponding trigger events)
// 3. Identifies sub-orchestration failure(s) from parent instance and calls RewindHistoryAsync recursively on failed sub-orchestration child instance(s)
// 4. Resets orchestration status of rewound instance in instance store table to prepare it to be restarted
// 5. Returns "failedLeaves", a list of the deepest failed instances on each failed branch to revive with RewindEvent messages
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool hasFailedSubOrchestrations = false;
string partitionFilter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'";
string orchestratorStartedFilter = $"{partitionFilter} and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.OrchestratorStarted)}'";
IReadOnlyList<TableEntity> orchestratorStartedEntities = await this.QueryHistoryAsync(orchestratorStartedFilter, instanceId, cancellationToken);
// get most recent orchestratorStarted event
string recentStartRowKey = orchestratorStartedEntities.Max(x => x.RowKey);
var recentStartRow = orchestratorStartedEntities.Where(y => y.RowKey == recentStartRowKey).ToList();
string executionId = recentStartRow[0].GetString(nameof(OrchestrationInstance.ExecutionId));
DateTime instanceTimestamp = recentStartRow[0].Timestamp.GetValueOrDefault().DateTime;
string executionIdFilter = $"{nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'";
var updateFilterBuilder = new StringBuilder();
updateFilterBuilder.Append($"{partitionFilter}");
updateFilterBuilder.Append($" and {executionIdFilter}");
updateFilterBuilder.Append(" and (");
updateFilterBuilder.Append($"{nameof(ExecutionCompletedEvent.OrchestrationStatus)} eq '{nameof(OrchestrationStatus.Failed)}'");
updateFilterBuilder.Append($" or {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.TaskFailed)}'");
updateFilterBuilder.Append($" or {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.SubOrchestrationInstanceFailed)}'");
updateFilterBuilder.Append(')');
IReadOnlyList<TableEntity> entitiesToClear = await this.QueryHistoryAsync(updateFilterBuilder.ToString(), instanceId, cancellationToken);
foreach (TableEntity entity in entitiesToClear)
{
if (entity.GetString(nameof(OrchestrationInstance.ExecutionId)) != executionId)
{
// the remaining entities are from a previous generation and can be discarded.
break;
}
if (entity.RowKey == SentinelRowKey)
{
continue;
}
int? taskScheduledId = entity.GetInt32(nameof(TaskCompletedEvent.TaskScheduledId));
var eventFilterBuilder = new StringBuilder();
eventFilterBuilder.Append($"{partitionFilter}");
eventFilterBuilder.Append($" and {executionIdFilter}");
eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventId)} eq {taskScheduledId.GetValueOrDefault()}");
switch (entity.GetString(nameof(HistoryEvent.EventType)))
{
// delete TaskScheduled corresponding to TaskFailed event
case nameof(EventType.TaskFailed):
eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.TaskScheduled)}'");
IReadOnlyList<TableEntity> taskScheduledEntities = await this.QueryHistoryAsync(eventFilterBuilder.ToString(), instanceId, cancellationToken);
TableEntity tsEntity = taskScheduledEntities[0];
tsEntity[nameof(TaskFailedEvent.Reason)] = "Rewound: " + tsEntity.GetString(nameof(HistoryEvent.EventType));
tsEntity[nameof(TaskFailedEvent.EventType)] = nameof(EventType.GenericEvent);
await this.HistoryTable.ReplaceEntityAsync(tsEntity, tsEntity.ETag, cancellationToken);
break;
// delete SubOrchestratorCreated corresponding to SubOrchestraionInstanceFailed event
case nameof(EventType.SubOrchestrationInstanceFailed):
hasFailedSubOrchestrations = true;
eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.SubOrchestrationInstanceCreated)}'");
IReadOnlyList<TableEntity> subOrchesratrationEntities = await this.QueryHistoryAsync(eventFilterBuilder.ToString(), instanceId, cancellationToken);
// the SubOrchestrationCreatedEvent is still healthy and will not be overwritten, just marked as rewound
TableEntity soEntity = subOrchesratrationEntities[0];
soEntity[nameof(SubOrchestrationInstanceFailedEvent.Reason)] = "Rewound: " + soEntity.GetString(nameof(HistoryEvent.EventType));
await this.HistoryTable.ReplaceEntityAsync(soEntity, soEntity.ETag, cancellationToken);
// recursive call to clear out failure events on child instances
await foreach (string childInstanceId in this.RewindHistoryAsync(soEntity.GetString(nameof(OrchestrationInstance.InstanceId)), cancellationToken))
{
yield return childInstanceId;
}
break;
}
// "clear" failure event by making RewindEvent: replay ignores row while dummy event preserves rowKey
entity[nameof(TaskFailedEvent.Reason)] = "Rewound: " + entity.GetString(nameof(HistoryEvent.EventType));
entity[nameof(TaskFailedEvent.EventType)] = nameof(EventType.GenericEvent);
await this.HistoryTable.ReplaceEntityAsync(entity, entity.ETag, cancellationToken);
}
// reset orchestration status in instance store table
await this.UpdateStatusForRewindAsync(instanceId, cancellationToken);
if (!hasFailedSubOrchestrations)
{
yield return instanceId;
}
}