public override async IAsyncEnumerable RewindHistoryAsync()

in src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs [269:372]


        public override async IAsyncEnumerable<string> RewindHistoryAsync(string instanceId, [EnumeratorCancellation] CancellationToken cancellationToken = default)
        {
            //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            // REWIND ALGORITHM:
            // 1. Finds failed execution of specified orchestration instance to rewind
            // 2. Finds failure entities to clear and over-writes them (as well as corresponding trigger events)
            // 3. Identifies sub-orchestration failure(s) from parent instance and calls RewindHistoryAsync recursively on failed sub-orchestration child instance(s)
            // 4. Resets orchestration status of rewound instance in instance store table to prepare it to be restarted
            // 5. Returns "failedLeaves", a list of the deepest failed instances on each failed branch to revive with RewindEvent messages
            ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

            bool hasFailedSubOrchestrations = false;
            string partitionFilter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'";

            string orchestratorStartedFilter = $"{partitionFilter} and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.OrchestratorStarted)}'";
            IReadOnlyList<TableEntity> orchestratorStartedEntities = await this.QueryHistoryAsync(orchestratorStartedFilter, instanceId, cancellationToken);

            // get most recent orchestratorStarted event
            string recentStartRowKey = orchestratorStartedEntities.Max(x => x.RowKey);
            var recentStartRow = orchestratorStartedEntities.Where(y => y.RowKey == recentStartRowKey).ToList();
            string executionId = recentStartRow[0].GetString(nameof(OrchestrationInstance.ExecutionId));
            DateTime instanceTimestamp = recentStartRow[0].Timestamp.GetValueOrDefault().DateTime;

            string executionIdFilter = $"{nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'";

            var updateFilterBuilder = new StringBuilder();
            updateFilterBuilder.Append($"{partitionFilter}");
            updateFilterBuilder.Append($" and {executionIdFilter}");
            updateFilterBuilder.Append(" and (");
            updateFilterBuilder.Append($"{nameof(ExecutionCompletedEvent.OrchestrationStatus)} eq '{nameof(OrchestrationStatus.Failed)}'");
            updateFilterBuilder.Append($" or {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.TaskFailed)}'");
            updateFilterBuilder.Append($" or {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.SubOrchestrationInstanceFailed)}'");
            updateFilterBuilder.Append(')');

            IReadOnlyList<TableEntity> entitiesToClear = await this.QueryHistoryAsync(updateFilterBuilder.ToString(), instanceId, cancellationToken);
            foreach (TableEntity entity in entitiesToClear)
            {
                if (entity.GetString(nameof(OrchestrationInstance.ExecutionId)) != executionId)
                {
                    // the remaining entities are from a previous generation and can be discarded.
                    break;
                }

                if (entity.RowKey == SentinelRowKey)
                {
                    continue;
                }

                int? taskScheduledId = entity.GetInt32(nameof(TaskCompletedEvent.TaskScheduledId));

                var eventFilterBuilder = new StringBuilder();
                eventFilterBuilder.Append($"{partitionFilter}");
                eventFilterBuilder.Append($" and {executionIdFilter}");
                eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventId)} eq {taskScheduledId.GetValueOrDefault()}");

                switch (entity.GetString(nameof(HistoryEvent.EventType)))
                {
                    // delete TaskScheduled corresponding to TaskFailed event
                    case nameof(EventType.TaskFailed):
                        eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.TaskScheduled)}'");
                        IReadOnlyList<TableEntity> taskScheduledEntities = await this.QueryHistoryAsync(eventFilterBuilder.ToString(), instanceId, cancellationToken);

                        TableEntity tsEntity = taskScheduledEntities[0];
                        tsEntity[nameof(TaskFailedEvent.Reason)] = "Rewound: " + tsEntity.GetString(nameof(HistoryEvent.EventType));
                        tsEntity[nameof(TaskFailedEvent.EventType)] = nameof(EventType.GenericEvent);
                        await this.HistoryTable.ReplaceEntityAsync(tsEntity, tsEntity.ETag, cancellationToken);
                        break;

                    // delete SubOrchestratorCreated corresponding to SubOrchestraionInstanceFailed event
                    case nameof(EventType.SubOrchestrationInstanceFailed):
                        hasFailedSubOrchestrations = true;

                        eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.SubOrchestrationInstanceCreated)}'");
                        IReadOnlyList<TableEntity> subOrchesratrationEntities = await this.QueryHistoryAsync(eventFilterBuilder.ToString(), instanceId, cancellationToken);

                        // the SubOrchestrationCreatedEvent is still healthy and will not be overwritten, just marked as rewound
                        TableEntity soEntity = subOrchesratrationEntities[0];
                        soEntity[nameof(SubOrchestrationInstanceFailedEvent.Reason)] = "Rewound: " + soEntity.GetString(nameof(HistoryEvent.EventType));
                        await this.HistoryTable.ReplaceEntityAsync(soEntity, soEntity.ETag, cancellationToken);

                        // recursive call to clear out failure events on child instances
                        await foreach (string childInstanceId in this.RewindHistoryAsync(soEntity.GetString(nameof(OrchestrationInstance.InstanceId)), cancellationToken))
                        {
                            yield return childInstanceId;
                        }

                        break;
                }

                // "clear" failure event by making RewindEvent: replay ignores row while dummy event preserves rowKey
                entity[nameof(TaskFailedEvent.Reason)] = "Rewound: " + entity.GetString(nameof(HistoryEvent.EventType));
                entity[nameof(TaskFailedEvent.EventType)] = nameof(EventType.GenericEvent);

                await this.HistoryTable.ReplaceEntityAsync(entity, entity.ETag, cancellationToken);
            }

            // reset orchestration status in instance store table
            await this.UpdateStatusForRewindAsync(instanceId, cancellationToken);

            if (!hasFailedSubOrchestrations)
            {
                yield return instanceId;
            }
        }