src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs (979 lines of code) (raw):

// ---------------------------------------------------------------------------------- // Copyright Microsoft Corporation // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- namespace DurableTask.AzureStorage.Tracking { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Reflection; using System.Runtime.CompilerServices; using System.Runtime.Serialization; using System.Text; using System.Threading; using System.Threading.Tasks; using Azure; using Azure.Data.Tables; using DurableTask.AzureStorage.Linq; using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Storage; using DurableTask.Core; using DurableTask.Core.History; /// <summary> /// Tracking store for use with <see cref="AzureStorageOrchestrationService"/>. Uses Azure Tables and Azure Blobs to store runtime state. /// </summary> class AzureTableTrackingStore : TrackingStoreBase { const string NameProperty = "Name"; const string InputProperty = "Input"; const string ResultProperty = "Result"; const string OutputProperty = "Output"; const string RowKeyProperty = nameof(ITableEntity.RowKey); const string PartitionKeyProperty = nameof(ITableEntity.PartitionKey); const string TimestampProperty = nameof(ITableEntity.Timestamp); const string SentinelRowKey = "sentinel"; const string IsCheckpointCompleteProperty = "IsCheckpointComplete"; const string CheckpointCompletedTimestampProperty = "CheckpointCompletedTimestamp"; // See https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model#property-types const int MaxTablePropertySizeInBytes = 60 * 1024; // 60KB to give buffer static readonly string[] VariableSizeEntityProperties = new[] { NameProperty, InputProperty, ResultProperty, OutputProperty, "Reason", "Details", "Correlation", "FailureDetails", "Tags", }; readonly string storageAccountName; readonly string taskHubName; readonly AzureStorageClient azureStorageClient; readonly AzureStorageOrchestrationServiceSettings settings; readonly AzureStorageOrchestrationServiceStats stats; readonly IReadOnlyDictionary<EventType, Type> eventTypeMap; readonly MessageManager messageManager; public AzureTableTrackingStore( AzureStorageClient azureStorageClient, MessageManager messageManager) { this.azureStorageClient = azureStorageClient; this.messageManager = messageManager; this.settings = this.azureStorageClient.Settings; this.stats = this.azureStorageClient.Stats; this.taskHubName = settings.TaskHubName; this.storageAccountName = this.azureStorageClient.TableAccountName; string historyTableName = settings.HistoryTableName; string instancesTableName = settings.InstanceTableName; this.HistoryTable = this.azureStorageClient.GetTableReference(historyTableName); this.InstancesTable = this.azureStorageClient.GetTableReference(instancesTableName); // Use reflection to learn all the different event types supported by DTFx. // This could have been hardcoded, but I generally try to avoid hardcoding of point-in-time DTFx knowledge. Type historyEventType = typeof(HistoryEvent); IEnumerable<Type> historyEventTypes = historyEventType.Assembly.GetTypes().Where( t => !t.IsAbstract && t.IsSubclassOf(historyEventType)); PropertyInfo eventTypeProperty = historyEventType.GetProperty(nameof(HistoryEvent.EventType)); this.eventTypeMap = historyEventTypes.ToDictionary( type => ((HistoryEvent)FormatterServices.GetUninitializedObject(type)).EventType); } // For testing internal AzureTableTrackingStore( AzureStorageOrchestrationServiceStats stats, Table instancesTable) { this.stats = stats; this.InstancesTable = instancesTable; this.settings = new AzureStorageOrchestrationServiceSettings { // Have to set FetchLargeMessageDataEnabled to false, as no MessageManager is // instantiated for this test. FetchLargeMessageDataEnabled = false, }; } internal Table HistoryTable { get; } internal Table InstancesTable { get; } /// <inheritdoc /> public override Task CreateAsync(CancellationToken cancellationToken = default) { return Task.WhenAll(new Task[] { this.HistoryTable.CreateIfNotExistsAsync(cancellationToken), this.InstancesTable.CreateIfNotExistsAsync(cancellationToken) }); } /// <inheritdoc /> public override Task DeleteAsync(CancellationToken cancellationToken = default) { return Task.WhenAll(new Task[] { this.HistoryTable.DeleteIfExistsAsync(cancellationToken), this.InstancesTable.DeleteIfExistsAsync(cancellationToken) }); } /// <inheritdoc /> public override async Task<bool> ExistsAsync(CancellationToken cancellationToken = default) { return this.HistoryTable != null && this.InstancesTable != null && await this.HistoryTable.ExistsAsync(cancellationToken) && await this.InstancesTable.ExistsAsync(cancellationToken); } /// <inheritdoc /> public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string instanceId, string expectedExecutionId, CancellationToken cancellationToken = default) { TableQueryResults<TableEntity> results = await this .GetHistoryEntitiesResponseInfoAsync(instanceId, expectedExecutionId, null, cancellationToken) .GetResultsAsync(cancellationToken: cancellationToken); // The sentinel row should always be the last row TableEntity sentinel = results.Entities.LastOrDefault(e => e.RowKey == SentinelRowKey); IList<HistoryEvent> historyEvents; string executionId; TrackingStoreContext trackingStoreContext = new TrackingStoreContext(); // If expectedExecutionId is provided but it does not match the sentinel executionId, // it may belong to a previous generation. In that case, treat it as an unknown executionId // and skip loading history. if (results.Entities.Count > 0 && (expectedExecutionId == null || expectedExecutionId == sentinel?.GetString("ExecutionId"))) { // The most recent generation will always be in the first history event. executionId = sentinel?.GetString("ExecutionId") ?? results.Entities[0].GetString("ExecutionId"); // Convert the table entities into history events. var events = new List<HistoryEvent>(results.Entities.Count); foreach (TableEntity entity in results.Entities) { if (entity.GetString("ExecutionId") != executionId) { // The remaining entities are from a previous generation and can be discarded. break; } // The sentinel row does not contain any history events, so ignore and continue if (entity == sentinel) { continue; } // Some entity properties may be stored in blob storage. await this.DecompressLargeEntityProperties(entity, trackingStoreContext.Blobs, cancellationToken); events.Add((HistoryEvent)TableEntityConverter.Deserialize(entity, GetTypeForTableEntity(entity))); } historyEvents = events; } else { historyEvents = Array.Empty<HistoryEvent>(); executionId = expectedExecutionId; } // Read the checkpoint completion time from the sentinel row. // A sentinel won't exist only if no instance of this ID has ever existed or the instance history // was purged. The IsCheckpointCompleteProperty was newly added _after_ v1.6.4. DateTime checkpointCompletionTime = DateTime.MinValue; ETag? eTagValue = sentinel?.ETag; if (sentinel != null && sentinel.TryGetValue(CheckpointCompletedTimestampProperty, out object timestampObj) && timestampObj is DateTimeOffset timestampProperty) { checkpointCompletionTime = timestampProperty.DateTime; } int currentEpisodeNumber = Utils.GetEpisodeNumber(historyEvents); this.settings.Logger.FetchedInstanceHistory( this.storageAccountName, this.taskHubName, instanceId, executionId, historyEvents.Count, currentEpisodeNumber, results.RequestCount, results.ElapsedMilliseconds, eTagValue?.ToString(), checkpointCompletionTime); return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreContext); } TableQueryResponse<TableEntity> GetHistoryEntitiesResponseInfoAsync(string instanceId, string expectedExecutionId, IList<string> projectionColumns, CancellationToken cancellationToken) { string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'"; if (!string.IsNullOrEmpty(expectedExecutionId)) { filter += $" and ({nameof(ITableEntity.RowKey)} eq '{SentinelRowKey}' or {nameof(OrchestrationInstance.ExecutionId)} eq '{expectedExecutionId}')"; } return this.HistoryTable.ExecuteQueryAsync<TableEntity>(filter, select: projectionColumns, cancellationToken: cancellationToken); } async Task<IReadOnlyList<TableEntity>> QueryHistoryAsync(string filter, string instanceId, CancellationToken cancellationToken) { TableQueryResults<TableEntity> results = await this .HistoryTable.ExecuteQueryAsync<TableEntity>(filter, cancellationToken: cancellationToken) .GetResultsAsync(cancellationToken: cancellationToken); IReadOnlyList<TableEntity> entities = results.Entities; string executionId = entities.FirstOrDefault()?.GetString(nameof(OrchestrationInstance.ExecutionId)) ?? string.Empty; this.settings.Logger.FetchedInstanceHistory( this.storageAccountName, this.taskHubName, instanceId, executionId, entities.Count, episode: -1, // We don't have enough information to get the episode number. It's also not important to have for this particular trace. results.RequestCount, results.ElapsedMilliseconds, eTag: string.Empty, DateTime.MinValue); return entities; } public override async IAsyncEnumerable<string> RewindHistoryAsync(string instanceId, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // REWIND ALGORITHM: // 1. Finds failed execution of specified orchestration instance to rewind // 2. Finds failure entities to clear and over-writes them (as well as corresponding trigger events) // 3. Identifies sub-orchestration failure(s) from parent instance and calls RewindHistoryAsync recursively on failed sub-orchestration child instance(s) // 4. Resets orchestration status of rewound instance in instance store table to prepare it to be restarted // 5. Returns "failedLeaves", a list of the deepest failed instances on each failed branch to revive with RewindEvent messages //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// bool hasFailedSubOrchestrations = false; string partitionFilter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'"; string orchestratorStartedFilter = $"{partitionFilter} and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.OrchestratorStarted)}'"; IReadOnlyList<TableEntity> orchestratorStartedEntities = await this.QueryHistoryAsync(orchestratorStartedFilter, instanceId, cancellationToken); // get most recent orchestratorStarted event string recentStartRowKey = orchestratorStartedEntities.Max(x => x.RowKey); var recentStartRow = orchestratorStartedEntities.Where(y => y.RowKey == recentStartRowKey).ToList(); string executionId = recentStartRow[0].GetString(nameof(OrchestrationInstance.ExecutionId)); DateTime instanceTimestamp = recentStartRow[0].Timestamp.GetValueOrDefault().DateTime; string executionIdFilter = $"{nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'"; var updateFilterBuilder = new StringBuilder(); updateFilterBuilder.Append($"{partitionFilter}"); updateFilterBuilder.Append($" and {executionIdFilter}"); updateFilterBuilder.Append(" and ("); updateFilterBuilder.Append($"{nameof(ExecutionCompletedEvent.OrchestrationStatus)} eq '{nameof(OrchestrationStatus.Failed)}'"); updateFilterBuilder.Append($" or {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.TaskFailed)}'"); updateFilterBuilder.Append($" or {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.SubOrchestrationInstanceFailed)}'"); updateFilterBuilder.Append(')'); IReadOnlyList<TableEntity> entitiesToClear = await this.QueryHistoryAsync(updateFilterBuilder.ToString(), instanceId, cancellationToken); foreach (TableEntity entity in entitiesToClear) { if (entity.GetString(nameof(OrchestrationInstance.ExecutionId)) != executionId) { // the remaining entities are from a previous generation and can be discarded. break; } if (entity.RowKey == SentinelRowKey) { continue; } int? taskScheduledId = entity.GetInt32(nameof(TaskCompletedEvent.TaskScheduledId)); var eventFilterBuilder = new StringBuilder(); eventFilterBuilder.Append($"{partitionFilter}"); eventFilterBuilder.Append($" and {executionIdFilter}"); eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventId)} eq {taskScheduledId.GetValueOrDefault()}"); switch (entity.GetString(nameof(HistoryEvent.EventType))) { // delete TaskScheduled corresponding to TaskFailed event case nameof(EventType.TaskFailed): eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.TaskScheduled)}'"); IReadOnlyList<TableEntity> taskScheduledEntities = await this.QueryHistoryAsync(eventFilterBuilder.ToString(), instanceId, cancellationToken); TableEntity tsEntity = taskScheduledEntities[0]; tsEntity[nameof(TaskFailedEvent.Reason)] = "Rewound: " + tsEntity.GetString(nameof(HistoryEvent.EventType)); tsEntity[nameof(TaskFailedEvent.EventType)] = nameof(EventType.GenericEvent); await this.HistoryTable.ReplaceEntityAsync(tsEntity, tsEntity.ETag, cancellationToken); break; // delete SubOrchestratorCreated corresponding to SubOrchestraionInstanceFailed event case nameof(EventType.SubOrchestrationInstanceFailed): hasFailedSubOrchestrations = true; eventFilterBuilder.Append($" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.SubOrchestrationInstanceCreated)}'"); IReadOnlyList<TableEntity> subOrchesratrationEntities = await this.QueryHistoryAsync(eventFilterBuilder.ToString(), instanceId, cancellationToken); // the SubOrchestrationCreatedEvent is still healthy and will not be overwritten, just marked as rewound TableEntity soEntity = subOrchesratrationEntities[0]; soEntity[nameof(SubOrchestrationInstanceFailedEvent.Reason)] = "Rewound: " + soEntity.GetString(nameof(HistoryEvent.EventType)); await this.HistoryTable.ReplaceEntityAsync(soEntity, soEntity.ETag, cancellationToken); // recursive call to clear out failure events on child instances await foreach (string childInstanceId in this.RewindHistoryAsync(soEntity.GetString(nameof(OrchestrationInstance.InstanceId)), cancellationToken)) { yield return childInstanceId; } break; } // "clear" failure event by making RewindEvent: replay ignores row while dummy event preserves rowKey entity[nameof(TaskFailedEvent.Reason)] = "Rewound: " + entity.GetString(nameof(HistoryEvent.EventType)); entity[nameof(TaskFailedEvent.EventType)] = nameof(EventType.GenericEvent); await this.HistoryTable.ReplaceEntityAsync(entity, entity.ETag, cancellationToken); } // reset orchestration status in instance store table await this.UpdateStatusForRewindAsync(instanceId, cancellationToken); if (!hasFailedSubOrchestrations) { yield return instanceId; } } /// <inheritdoc /> public override async IAsyncEnumerable<OrchestrationState> GetStateAsync(string instanceId, bool allExecutions, bool fetchInput, [EnumeratorCancellation] CancellationToken cancellationToken = default) { InstanceStatus instanceStatus = await this.FetchInstanceStatusInternalAsync(instanceId, fetchInput, cancellationToken); if (instanceStatus != null) { yield return instanceStatus.State; } } #nullable enable /// <inheritdoc /> public override async Task<OrchestrationState?> GetStateAsync(string instanceId, string executionId, bool fetchInput, CancellationToken cancellationToken = default) { InstanceStatus? instanceStatus = await this.FetchInstanceStatusInternalAsync(instanceId, fetchInput, cancellationToken); return instanceStatus?.State; } /// <inheritdoc /> public override Task<InstanceStatus?> FetchInstanceStatusAsync(string instanceId, CancellationToken cancellationToken = default) { return this.FetchInstanceStatusInternalAsync(instanceId, fetchInput: false, cancellationToken); } /// <inheritdoc /> internal async Task<InstanceStatus?> FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput, CancellationToken cancellationToken) { if (instanceId == null) { throw new ArgumentNullException(nameof(instanceId)); } var queryCondition = new OrchestrationInstanceStatusQueryCondition { InstanceId = instanceId, FetchInput = fetchInput, }; ODataCondition odata = queryCondition.ToOData(); var sw = Stopwatch.StartNew(); OrchestrationInstanceStatus? tableEntity = await this.InstancesTable .ExecuteQueryAsync<OrchestrationInstanceStatus>(odata.Filter, 1, odata.Select, cancellationToken) .FirstOrDefaultAsync(); sw.Stop(); OrchestrationState? orchestrationState = tableEntity != null ? await this.ConvertFromAsync(tableEntity, cancellationToken) : null; this.settings.Logger.FetchedInstanceStatus( this.storageAccountName, this.taskHubName, instanceId, orchestrationState?.OrchestrationInstance.ExecutionId ?? string.Empty, orchestrationState?.OrchestrationStatus.ToString() ?? "NotFound", sw.ElapsedMilliseconds); if (tableEntity == null || orchestrationState == null) { return null; } return new InstanceStatus(orchestrationState, tableEntity.ETag); } #nullable disable Task<OrchestrationState> ConvertFromAsync(OrchestrationInstanceStatus tableEntity, CancellationToken cancellationToken) { var instanceId = KeySanitation.UnescapePartitionKey(tableEntity.PartitionKey); return ConvertFromAsync(tableEntity, instanceId, cancellationToken); } async Task<OrchestrationState> ConvertFromAsync(OrchestrationInstanceStatus orchestrationInstanceStatus, string instanceId, CancellationToken cancellationToken) { var orchestrationState = new OrchestrationState(); if (!Enum.TryParse(orchestrationInstanceStatus.RuntimeStatus, out orchestrationState.OrchestrationStatus)) { // This is not expected, but could happen if there is invalid data in the Instances table. orchestrationState.OrchestrationStatus = (OrchestrationStatus)(-1); } orchestrationState.OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId, ExecutionId = orchestrationInstanceStatus.ExecutionId, }; orchestrationState.Name = orchestrationInstanceStatus.Name; orchestrationState.Version = orchestrationInstanceStatus.Version; orchestrationState.Status = orchestrationInstanceStatus.CustomStatus; orchestrationState.CreatedTime = orchestrationInstanceStatus.CreatedTime; orchestrationState.CompletedTime = orchestrationInstanceStatus.CompletedTime.GetValueOrDefault(); orchestrationState.LastUpdatedTime = orchestrationInstanceStatus.LastUpdatedTime; orchestrationState.Input = orchestrationInstanceStatus.Input; orchestrationState.Output = orchestrationInstanceStatus.Output; orchestrationState.ScheduledStartTime = orchestrationInstanceStatus.ScheduledStartTime; orchestrationState.Generation = orchestrationInstanceStatus.Generation; orchestrationState.Tags = !string.IsNullOrEmpty(orchestrationInstanceStatus.Tags) ? TagsSerializer.Deserialize(orchestrationInstanceStatus.Tags) : null; if (this.settings.FetchLargeMessageDataEnabled) { if (MessageManager.TryGetLargeMessageReference(orchestrationState.Input, out Uri blobUrl)) { string json = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl, cancellationToken); // Depending on which blob this is, we interpret it differently. if (blobUrl.AbsolutePath.EndsWith("ExecutionStarted.json.gz")) { // The downloaded content is an ExecutedStarted message payload that // was created when the orchestration was started. MessageData msg = this.messageManager.DeserializeMessageData(json); if (msg?.TaskMessage?.Event is ExecutionStartedEvent startEvent) { orchestrationState.Input = startEvent.Input; } else { this.settings.Logger.GeneralWarning( this.storageAccountName, this.taskHubName, $"Orchestration input blob URL '{blobUrl}' contained unrecognized data.", instanceId); } } else { // The downloaded content is the raw input JSON orchestrationState.Input = json; } } orchestrationState.Output = await this.messageManager.FetchLargeMessageIfNecessary(orchestrationState.Output, cancellationToken); } return orchestrationState; } /// <inheritdoc /> public override async IAsyncEnumerable<OrchestrationState> GetStateAsync(IEnumerable<string> instanceIds, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (instanceIds == null) { yield break; } IEnumerable<Task<OrchestrationState>> instanceQueries = instanceIds.Select(instance => this.GetStateAsync(instance, allExecutions: true, fetchInput: false, cancellationToken).SingleOrDefaultAsync().AsTask()); foreach (OrchestrationState state in await Task.WhenAll(instanceQueries)) { if (state != null) { yield return state; } } } /// <inheritdoc /> public override IAsyncEnumerable<OrchestrationState> GetStateAsync(CancellationToken cancellationToken = default) { return this.QueryStateAsync($"{nameof(ITableEntity.RowKey)} eq ''", cancellationToken: cancellationToken); } public override AsyncPageable<OrchestrationState> GetStateAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, CancellationToken cancellationToken = default) { ODataCondition odata = OrchestrationInstanceStatusQueryCondition.Parse(createdTimeFrom, createdTimeTo, runtimeStatus).ToOData(); return this.QueryStateAsync(odata.Filter, odata.Select, cancellationToken); } public override AsyncPageable<OrchestrationState> GetStateAsync(OrchestrationInstanceStatusQueryCondition condition, CancellationToken cancellationToken = default) { ODataCondition odata = condition.ToOData(); return this.QueryStateAsync(odata.Filter, odata.Select, cancellationToken); } AsyncPageable<OrchestrationState> QueryStateAsync(string filter = null, IEnumerable<string> select = null, CancellationToken cancellationToken = default) { return this.InstancesTable .ExecuteQueryAsync<OrchestrationInstanceStatus>(filter, select: select, cancellationToken: cancellationToken) .TransformPagesAsync((p, t) => p.Values .SelectAsync((s, t) => new ValueTask<OrchestrationState>(this.ConvertFromAsync(s, KeySanitation.UnescapePartitionKey(s.PartitionKey), t)))); } async Task<PurgeHistoryResult> DeleteHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, CancellationToken cancellationToken) { var condition = OrchestrationInstanceStatusQueryCondition.Parse( createdTimeFrom, createdTimeTo, runtimeStatus); condition.FetchInput = false; condition.FetchOutput = false; ODataCondition odata = condition.ToOData(); // Limit to batches of 100 to avoid excessive memory usage and table storage scanning int storageRequests = 0; int instancesDeleted = 0; int rowsDeleted = 0; var options = new ParallelOptions { MaxDegreeOfParallelism = this.settings.MaxStorageOperationConcurrency }; AsyncPageable<OrchestrationInstanceStatus> entitiesPageable = this.InstancesTable.ExecuteQueryAsync<OrchestrationInstanceStatus>(odata.Filter, select: odata.Select, cancellationToken: cancellationToken); await foreach (Page<OrchestrationInstanceStatus> page in entitiesPageable.AsPages(pageSizeHint: 100)) { // The underlying client throttles await Task.WhenAll(page.Values.Select(async instance => { PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); Interlocked.Add(ref storageRequests, statisticsFromDeletion.RowsDeleted); Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); })); } return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted); } async Task<PurgeHistoryResult> DeleteAllDataForOrchestrationInstance(OrchestrationInstanceStatus orchestrationInstanceStatus, CancellationToken cancellationToken) { int storageRequests = 0; int rowsDeleted = 0; string sanitizedInstanceId = KeySanitation.UnescapePartitionKey(orchestrationInstanceStatus.PartitionKey); TableQueryResults<TableEntity> results = await this .GetHistoryEntitiesResponseInfoAsync( instanceId: sanitizedInstanceId, expectedExecutionId: null, projectionColumns: new[] { RowKeyProperty, PartitionKeyProperty, TimestampProperty }, cancellationToken) .GetResultsAsync(cancellationToken: cancellationToken); storageRequests += results.RequestCount; IReadOnlyList<TableEntity> historyEntities = results.Entities; var tasks = new List<Task> { Task.Run(async () => { int storageOperations = await this.messageManager.DeleteLargeMessageBlobs(sanitizedInstanceId, cancellationToken); Interlocked.Add(ref storageRequests, storageOperations); }), Task.Run(async () => { var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchAsync(historyEntities, cancellationToken); Interlocked.Add(ref rowsDeleted, deletedEntitiesResponseInfo.Responses.Count); Interlocked.Add(ref storageRequests, deletedEntitiesResponseInfo.RequestCount); }), this.InstancesTable.DeleteEntityAsync(new TableEntity(orchestrationInstanceStatus.PartitionKey, string.Empty), ETag.All, cancellationToken: cancellationToken) }; await Task.WhenAll(tasks); // This is for the instances table deletion storageRequests++; return new PurgeHistoryResult(storageRequests, 1, rowsDeleted); } /// <inheritdoc /> public override Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType, CancellationToken cancellationToken = default) { throw new NotSupportedException(); } /// <inheritdoc /> public override async Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(string instanceId, CancellationToken cancellationToken = default) { string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); string filter = $"{PartitionKeyProperty} eq '{sanitizedInstanceId}' and {RowKeyProperty} eq ''"; var results = await this.InstancesTable .ExecuteQueryAsync<OrchestrationInstanceStatus>(filter, cancellationToken: cancellationToken) .GetResultsAsync(cancellationToken: cancellationToken); OrchestrationInstanceStatus orchestrationInstanceStatus = results.Entities.FirstOrDefault(); if (orchestrationInstanceStatus != null) { PurgeHistoryResult result = await this.DeleteAllDataForOrchestrationInstance(orchestrationInstanceStatus, cancellationToken); this.settings.Logger.PurgeInstanceHistory( this.storageAccountName, this.taskHubName, instanceId, DateTime.MinValue.ToString(), DateTime.MinValue.ToString(), string.Empty, result.StorageRequests, result.InstancesDeleted, results.ElapsedMilliseconds); return result; } return new PurgeHistoryResult(0, 0, 0); } /// <inheritdoc /> public override async Task<PurgeHistoryResult> PurgeInstanceHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, CancellationToken cancellationToken = default) { Stopwatch stopwatch = Stopwatch.StartNew(); List<OrchestrationStatus> runtimeStatusList = runtimeStatus?.Where( status => status == OrchestrationStatus.Completed || status == OrchestrationStatus.Terminated || status == OrchestrationStatus.Canceled || status == OrchestrationStatus.Failed).ToList(); PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, cancellationToken); this.settings.Logger.PurgeInstanceHistory( this.storageAccountName, this.taskHubName, string.Empty, createdTimeFrom.ToString(), createdTimeTo.ToString() ?? DateTime.MinValue.ToString(), runtimeStatus != null ? string.Join(",", runtimeStatus.Select(x => x.ToString())) : string.Empty, result.StorageRequests, result.InstancesDeleted, stopwatch.ElapsedMilliseconds); return result; } /// <inheritdoc /> public override async Task<bool> SetNewExecutionAsync( ExecutionStartedEvent executionStartedEvent, ETag? eTag, string inputPayloadOverride, CancellationToken cancellationToken = default) { string sanitizedInstanceId = KeySanitation.EscapePartitionKey(executionStartedEvent.OrchestrationInstance.InstanceId); TableEntity entity = new TableEntity(sanitizedInstanceId, "") { ["Input"] = inputPayloadOverride ?? executionStartedEvent.Input, ["CreatedTime"] = executionStartedEvent.Timestamp, ["Name"] = executionStartedEvent.Name, ["Version"] = executionStartedEvent.Version, ["RuntimeStatus"] = OrchestrationStatus.Pending.ToString("G"), ["LastUpdatedTime"] = DateTime.UtcNow, ["TaskHubName"] = this.settings.TaskHubName, ["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime, ["ExecutionId"] = executionStartedEvent.OrchestrationInstance.ExecutionId, ["Generation"] = executionStartedEvent.Generation, ["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags), }; // It is possible that the queue message was small enough to be written directly to a queue message, // not a blob, but is too large to be written to a table property. await this.CompressLargeMessageAsync(entity, listOfBlobs: null, cancellationToken: cancellationToken); Stopwatch stopwatch = Stopwatch.StartNew(); try { if (eTag == null) { // This is the case for creating a new instance. await this.InstancesTable.InsertEntityAsync(entity, cancellationToken); } else { // This is the case for overwriting an existing instance. await this.InstancesTable.ReplaceEntityAsync(entity, eTag.GetValueOrDefault(), cancellationToken); } } catch (DurableTaskStorageException e) when ( e.HttpStatusCode == 409 /* Conflict */ || e.HttpStatusCode == 412 /* Precondition failed */) { // Ignore. The main scenario for this is handling race conditions in status update. return false; } // Episode 0 means the orchestrator hasn't started yet. int currentEpisodeNumber = 0; this.settings.Logger.InstanceStatusUpdate( this.storageAccountName, this.taskHubName, executionStartedEvent.OrchestrationInstance.InstanceId, executionStartedEvent.OrchestrationInstance.ExecutionId, OrchestrationStatus.Pending, currentEpisodeNumber, stopwatch.ElapsedMilliseconds); return true; } /// <inheritdoc /> public override async Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default) { string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); TableEntity entity = new TableEntity(sanitizedInstanceId, "") { ["RuntimeStatus"] = OrchestrationStatus.Pending.ToString("G"), ["LastUpdatedTime"] = DateTime.UtcNow, }; Stopwatch stopwatch = Stopwatch.StartNew(); await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken); // We don't have enough information to get the episode number. // It's also not important to have for this particular trace. int currentEpisodeNumber = 0; this.settings.Logger.InstanceStatusUpdate( this.storageAccountName, this.taskHubName, instanceId, string.Empty, OrchestrationStatus.Pending, currentEpisodeNumber, stopwatch.ElapsedMilliseconds); } /// <inheritdoc /> public override Task StartAsync(CancellationToken cancellationToken = default) { ServicePointManager.FindServicePoint(this.HistoryTable.Uri).UseNagleAlgorithm = false; ServicePointManager.FindServicePoint(this.InstancesTable.Uri).UseNagleAlgorithm = false; return Task.CompletedTask; } /// <inheritdoc /> public override async Task<ETag?> UpdateStateAsync( OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTagValue, object trackingStoreContext, CancellationToken cancellationToken = default) { int estimatedBytes = 0; IList<HistoryEvent> newEvents = newRuntimeState.NewEvents; IList<HistoryEvent> allEvents = newRuntimeState.Events; TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext; int episodeNumber = Utils.GetEpisodeNumber(newRuntimeState); var newEventListBuffer = new StringBuilder(4000); var historyEventBatch = new List<TableTransactionAction>(); OrchestrationStatus runtimeStatus = OrchestrationStatus.Running; string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty) { // TODO: Translating null to "null" is a temporary workaround. We should prioritize // https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary. ["CustomStatus"] = newRuntimeState.Status ?? "null", ["ExecutionId"] = executionId, ["LastUpdatedTime"] = newEvents.Last().Timestamp, }; // check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario List<string> blobsToDelete = null; if (oldRuntimeState != newRuntimeState && context.Blobs.Count > 0) { blobsToDelete = context.Blobs; context.Blobs = new List<string>(); } for (int i = 0; i < newEvents.Count; i++) { bool isFinalEvent = i == newEvents.Count - 1; HistoryEvent historyEvent = newEvents[i]; // For backwards compatibility, we convert timer timestamps to UTC prior to persisting to Azure Storage // see: https://github.com/Azure/durabletask/pull/1138 Utils.ConvertDateTimeInHistoryEventsToUTC(historyEvent); var historyEntity = TableEntityConverter.Serialize(historyEvent); historyEntity.PartitionKey = sanitizedInstanceId; newEventListBuffer.Append(historyEvent.EventType.ToString()).Append(','); // The row key is the sequence number, which represents the chronological ordinal of the event. long sequenceNumber = i + (allEvents.Count - newEvents.Count); historyEntity.RowKey = sequenceNumber.ToString("X16"); historyEntity["ExecutionId"] = executionId; await this.CompressLargeMessageAsync(historyEntity, context.Blobs, cancellationToken); // Replacement can happen if the orchestration episode gets replayed due to a commit failure in one of the steps below. historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, historyEntity)); // Keep track of the byte count to ensure we don't hit the 4 MB per-batch maximum estimatedBytes += GetEstimatedByteCount(historyEntity); // Monitor for orchestration instance events switch (historyEvent.EventType) { case EventType.ExecutionStarted: runtimeStatus = OrchestrationStatus.Running; ExecutionStartedEvent executionStartedEvent = (ExecutionStartedEvent)historyEvent; instanceEntity["Name"] = executionStartedEvent.Name; instanceEntity["Version"] = executionStartedEvent.Version; instanceEntity["CreatedTime"] = executionStartedEvent.Timestamp; instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString(); if (executionStartedEvent.ScheduledStartTime.HasValue) { instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; } this.SetInstancesTablePropertyFromHistoryProperty( historyEntity, instanceEntity, historyPropertyName: nameof(executionStartedEvent.Input), instancePropertyName: InputProperty, data: executionStartedEvent.Input); break; case EventType.ExecutionCompleted: ExecutionCompletedEvent executionCompleted = (ExecutionCompletedEvent)historyEvent; runtimeStatus = executionCompleted.OrchestrationStatus; instanceEntity["RuntimeStatus"] = executionCompleted.OrchestrationStatus.ToString(); instanceEntity["CompletedTime"] = DateTime.UtcNow; this.SetInstancesTablePropertyFromHistoryProperty( historyEntity, instanceEntity, historyPropertyName: nameof(executionCompleted.Result), instancePropertyName: OutputProperty, data: executionCompleted.FailureDetails?.ToString() ?? executionCompleted.Result); break; case EventType.ExecutionTerminated: runtimeStatus = OrchestrationStatus.Terminated; ExecutionTerminatedEvent executionTerminatedEvent = (ExecutionTerminatedEvent)historyEvent; instanceEntity["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString(); instanceEntity["CompletedTime"] = DateTime.UtcNow; this.SetInstancesTablePropertyFromHistoryProperty( historyEntity, instanceEntity, historyPropertyName: nameof(executionTerminatedEvent.Input), instancePropertyName: OutputProperty, data: executionTerminatedEvent.Input); break; case EventType.ExecutionSuspended: runtimeStatus = OrchestrationStatus.Suspended; ExecutionSuspendedEvent executionSuspendedEvent = (ExecutionSuspendedEvent)historyEvent; instanceEntity["RuntimeStatus"] = OrchestrationStatus.Suspended.ToString(); this.SetInstancesTablePropertyFromHistoryProperty( historyEntity, instanceEntity, historyPropertyName: nameof(executionSuspendedEvent.Reason), instancePropertyName: OutputProperty, data: executionSuspendedEvent.Reason); break; case EventType.ExecutionResumed: runtimeStatus = OrchestrationStatus.Running; ExecutionResumedEvent executionResumedEvent = (ExecutionResumedEvent)historyEvent; instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString(); this.SetInstancesTablePropertyFromHistoryProperty( historyEntity, instanceEntity, historyPropertyName: nameof(executionResumedEvent.Reason), instancePropertyName: OutputProperty, data: executionResumedEvent.Reason); break; case EventType.ContinueAsNew: runtimeStatus = OrchestrationStatus.ContinuedAsNew; ExecutionCompletedEvent executionCompletedEvent = (ExecutionCompletedEvent)historyEvent; instanceEntity["RuntimeStatus"] = OrchestrationStatus.ContinuedAsNew.ToString(); this.SetInstancesTablePropertyFromHistoryProperty( historyEntity, instanceEntity, historyPropertyName: nameof(executionCompletedEvent.Result), instancePropertyName: OutputProperty, data: executionCompletedEvent.Result); break; } // Table storage only supports inserts of up to 100 entities at a time or 4 MB at a time. if (historyEventBatch.Count == 99 || estimatedBytes > 3 * 1024 * 1024 /* 3 MB */) { eTagValue = await this.UploadHistoryBatch( instanceId, sanitizedInstanceId, executionId, historyEventBatch, newEventListBuffer, allEvents.Count, episodeNumber, estimatedBytes, eTagValue, isFinalBatch: isFinalEvent, cancellationToken: cancellationToken); // Reset local state for the next batch newEventListBuffer.Clear(); historyEventBatch.Clear(); estimatedBytes = 0; } } // First persistence step is to commit history to the history table. Messages must come after. if (historyEventBatch.Count > 0) { eTagValue = await this.UploadHistoryBatch( instanceId, sanitizedInstanceId, executionId, historyEventBatch, newEventListBuffer, allEvents.Count, episodeNumber, estimatedBytes, eTagValue, isFinalBatch: true, cancellationToken: cancellationToken); } Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity); this.settings.Logger.InstanceStatusUpdate( this.storageAccountName, this.taskHubName, instanceId, executionId, runtimeStatus, episodeNumber, orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); // finally, delete orphaned blobs from the previous execution history. // We had to wait until the new history has committed to make sure the blobs are no longer necessary. if (blobsToDelete != null) { var tasks = new List<Task>(blobsToDelete.Count); foreach (var blobName in blobsToDelete) { tasks.Add(this.messageManager.DeleteBlobAsync(blobName)); } await Task.WhenAll(tasks); } return eTagValue; } static int GetEstimatedByteCount(TableEntity entity) { // Assume at least 1 KB of data per entity to account for static-length properties int estimatedByteCount = 1024; // Count the bytes for variable-length properties, which are assumed to always be strings foreach (string propertyName in VariableSizeEntityProperties) { if (entity.TryGetValue(propertyName, out object property) && property is string stringProperty && stringProperty != "") { estimatedByteCount += Encoding.Unicode.GetByteCount(stringProperty); } } return estimatedByteCount; } Type GetTypeForTableEntity(TableEntity tableEntity) { string propertyName = nameof(HistoryEvent.EventType); if (!tableEntity.TryGetValue(propertyName, out object eventTypeProperty)) { throw new ArgumentException($"The TableEntity did not contain a '{propertyName}' property."); } if (eventTypeProperty is not string stringProperty) { throw new ArgumentException($"The TableEntity's {propertyName} property type must a String."); } if (!Enum.TryParse(stringProperty, out EventType eventType)) { throw new ArgumentException($"{stringProperty} is not a valid EventType value."); } return this.eventTypeMap[eventType]; } // Assigns the target table entity property. Any large message for type 'Input, or 'Output' would have been compressed earlier as part of the 'entity' object, // so, we only need to assign the 'entity' object's blobName to the target table entity blob name property. void SetInstancesTablePropertyFromHistoryProperty( TableEntity TableEntity, TableEntity instanceEntity, string historyPropertyName, string instancePropertyName, string data) { string blobPropertyName = GetBlobPropertyName(historyPropertyName); if (TableEntity.TryGetValue(blobPropertyName, out object blobProperty) && blobProperty is string blobName) { // This is a large message string blobUrl = this.messageManager.GetBlobUrl(blobName); instanceEntity[instancePropertyName] = blobUrl; } else { // This is a normal-sized message and can be stored inline instanceEntity[instancePropertyName] = data; } } async Task CompressLargeMessageAsync(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken) { foreach (string propertyName in VariableSizeEntityProperties) { if (entity.TryGetValue(propertyName, out object property) && property is string stringProperty && this.ExceedsMaxTablePropertySize(stringProperty)) { // Upload the large property as a blob in Blob Storage since it won't fit in table storage. string blobName = GetBlobName(entity, propertyName); byte[] messageBytes = Encoding.UTF8.GetBytes(stringProperty); await this.messageManager.CompressAndUploadAsBytesAsync(messageBytes, blobName, cancellationToken); // Clear out the original property value and create a new "*BlobName"-suffixed property. // The runtime will look for the new "*BlobName"-suffixed column to know if a property is stored in a blob. string blobPropertyName = GetBlobPropertyName(propertyName); entity.Add(blobPropertyName, blobName); entity[propertyName] = string.Empty; // if necessary, keep track of all the blobs associated with this execution listOfBlobs?.Add(blobName); } } } async Task DecompressLargeEntityProperties(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken) { // Check for entity properties stored in blob storage foreach (string propertyName in VariableSizeEntityProperties) { string blobPropertyName = GetBlobPropertyName(propertyName); if (entity.TryGetValue(blobPropertyName, out object property) && property is string blobName) { string decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName, cancellationToken); entity[propertyName] = decompressedMessage; entity.Remove(blobPropertyName); // keep track of all the blobs associated with this execution listOfBlobs.Add(blobName); } } } static string GetBlobPropertyName(string originalPropertyName) { // WARNING: Changing this is a breaking change! return originalPropertyName + "BlobName"; } static string GetBlobName(TableEntity entity, string property) { string sanitizedInstanceId = entity.PartitionKey; string sequenceNumber = entity.RowKey; string eventType; if (entity.TryGetValue("EventType", out object obj) && obj is string value) { eventType = value; } else if (property == "Input") { // This message is just to start the orchestration, so it does not have a corresponding // EventType. Use a hardcoded value to record the orchestration input. eventType = "Input"; } else if (property == "Tags") { eventType = "Tags"; } else { throw new InvalidOperationException($"Could not compute the blob name for property {property}"); } // randomize the blob name to prevent accidental races in split-brain situations (#890) uint random = (uint)(new Random()).Next(); return $"{sanitizedInstanceId}/history-{sequenceNumber}-{eventType}-{random:X8}-{property}.json.gz"; } async Task<ETag?> UploadHistoryBatch( string instanceId, string sanitizedInstanceId, string executionId, IList<TableTransactionAction> historyEventBatch, StringBuilder historyEventNamesBuffer, int numberOfTotalEvents, int episodeNumber, int estimatedBatchSizeInBytes, ETag? eTagValue, bool isFinalBatch, CancellationToken cancellationToken) { // Adding / updating sentinel entity TableEntity sentinelEntity = new TableEntity(sanitizedInstanceId, SentinelRowKey) { ["ExecutionId"] = executionId, [IsCheckpointCompleteProperty] = isFinalBatch, }; if (isFinalBatch) { sentinelEntity[CheckpointCompletedTimestampProperty] = DateTime.UtcNow; } if (eTagValue != null) { historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.UpdateMerge, sentinelEntity, eTagValue.GetValueOrDefault())); } else { historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.Add, sentinelEntity)); } TableTransactionResults resultInfo; Stopwatch stopwatch = Stopwatch.StartNew(); try { resultInfo = await this.HistoryTable.ExecuteBatchAsync(historyEventBatch, cancellationToken); } catch (DurableTaskStorageException ex) { if (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed) { this.settings.Logger.SplitBrainDetected( this.storageAccountName, this.taskHubName, instanceId, executionId, historyEventBatch.Count - 1, // exclude sentinel from count numberOfTotalEvents, historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma stopwatch.ElapsedMilliseconds, eTagValue?.ToString()); } throw; } IReadOnlyList<Response> responses = resultInfo.Responses; ETag? newETagValue = null; for (int i = responses.Count - 1; i >= 0; i--) { if (historyEventBatch[i].Entity.RowKey == SentinelRowKey) { newETagValue = responses[i].Headers.ETag; break; } } this.settings.Logger.AppendedInstanceHistory( this.storageAccountName, this.taskHubName, instanceId, executionId, historyEventBatch.Count - 1, // exclude sentinel from count numberOfTotalEvents, historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma episodeNumber, resultInfo.ElapsedMilliseconds, estimatedBatchSizeInBytes, string.Concat(eTagValue?.ToString() ?? "(null)", " --> ", newETagValue?.ToString() ?? "(null)"), isFinalBatch); return newETagValue; } bool ExceedsMaxTablePropertySize(string data) { if (!string.IsNullOrEmpty(data) && Encoding.Unicode.GetByteCount(data) > MaxTablePropertySizeInBytes) { return true; } return false; } class TrackingStoreContext { public List<string> Blobs { get; set; } = new List<string>(); } } }