in src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs [809:1020]
public override async Task<ETag?> UpdateStateAsync(
OrchestrationRuntimeState newRuntimeState,
OrchestrationRuntimeState oldRuntimeState,
string instanceId,
string executionId,
ETag? eTagValue,
object trackingStoreContext,
CancellationToken cancellationToken = default)
{
int estimatedBytes = 0;
IList<HistoryEvent> newEvents = newRuntimeState.NewEvents;
IList<HistoryEvent> allEvents = newRuntimeState.Events;
TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext;
int episodeNumber = Utils.GetEpisodeNumber(newRuntimeState);
var newEventListBuffer = new StringBuilder(4000);
var historyEventBatch = new List<TableTransactionAction>();
OrchestrationStatus runtimeStatus = OrchestrationStatus.Running;
string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty)
{
// TODO: Translating null to "null" is a temporary workaround. We should prioritize
// https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary.
["CustomStatus"] = newRuntimeState.Status ?? "null",
["ExecutionId"] = executionId,
["LastUpdatedTime"] = newEvents.Last().Timestamp,
};
// check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario
List<string> blobsToDelete = null;
if (oldRuntimeState != newRuntimeState && context.Blobs.Count > 0)
{
blobsToDelete = context.Blobs;
context.Blobs = new List<string>();
}
for (int i = 0; i < newEvents.Count; i++)
{
bool isFinalEvent = i == newEvents.Count - 1;
HistoryEvent historyEvent = newEvents[i];
// For backwards compatibility, we convert timer timestamps to UTC prior to persisting to Azure Storage
// see: https://github.com/Azure/durabletask/pull/1138
Utils.ConvertDateTimeInHistoryEventsToUTC(historyEvent);
var historyEntity = TableEntityConverter.Serialize(historyEvent);
historyEntity.PartitionKey = sanitizedInstanceId;
newEventListBuffer.Append(historyEvent.EventType.ToString()).Append(',');
// The row key is the sequence number, which represents the chronological ordinal of the event.
long sequenceNumber = i + (allEvents.Count - newEvents.Count);
historyEntity.RowKey = sequenceNumber.ToString("X16");
historyEntity["ExecutionId"] = executionId;
await this.CompressLargeMessageAsync(historyEntity, context.Blobs, cancellationToken);
// Replacement can happen if the orchestration episode gets replayed due to a commit failure in one of the steps below.
historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, historyEntity));
// Keep track of the byte count to ensure we don't hit the 4 MB per-batch maximum
estimatedBytes += GetEstimatedByteCount(historyEntity);
// Monitor for orchestration instance events
switch (historyEvent.EventType)
{
case EventType.ExecutionStarted:
runtimeStatus = OrchestrationStatus.Running;
ExecutionStartedEvent executionStartedEvent = (ExecutionStartedEvent)historyEvent;
instanceEntity["Name"] = executionStartedEvent.Name;
instanceEntity["Version"] = executionStartedEvent.Version;
instanceEntity["CreatedTime"] = executionStartedEvent.Timestamp;
instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString();
if (executionStartedEvent.ScheduledStartTime.HasValue)
{
instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime;
}
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionStartedEvent.Input),
instancePropertyName: InputProperty,
data: executionStartedEvent.Input);
break;
case EventType.ExecutionCompleted:
ExecutionCompletedEvent executionCompleted = (ExecutionCompletedEvent)historyEvent;
runtimeStatus = executionCompleted.OrchestrationStatus;
instanceEntity["RuntimeStatus"] = executionCompleted.OrchestrationStatus.ToString();
instanceEntity["CompletedTime"] = DateTime.UtcNow;
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionCompleted.Result),
instancePropertyName: OutputProperty,
data: executionCompleted.FailureDetails?.ToString() ?? executionCompleted.Result);
break;
case EventType.ExecutionTerminated:
runtimeStatus = OrchestrationStatus.Terminated;
ExecutionTerminatedEvent executionTerminatedEvent = (ExecutionTerminatedEvent)historyEvent;
instanceEntity["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString();
instanceEntity["CompletedTime"] = DateTime.UtcNow;
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionTerminatedEvent.Input),
instancePropertyName: OutputProperty,
data: executionTerminatedEvent.Input);
break;
case EventType.ExecutionSuspended:
runtimeStatus = OrchestrationStatus.Suspended;
ExecutionSuspendedEvent executionSuspendedEvent = (ExecutionSuspendedEvent)historyEvent;
instanceEntity["RuntimeStatus"] = OrchestrationStatus.Suspended.ToString();
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionSuspendedEvent.Reason),
instancePropertyName: OutputProperty,
data: executionSuspendedEvent.Reason);
break;
case EventType.ExecutionResumed:
runtimeStatus = OrchestrationStatus.Running;
ExecutionResumedEvent executionResumedEvent = (ExecutionResumedEvent)historyEvent;
instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString();
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionResumedEvent.Reason),
instancePropertyName: OutputProperty,
data: executionResumedEvent.Reason);
break;
case EventType.ContinueAsNew:
runtimeStatus = OrchestrationStatus.ContinuedAsNew;
ExecutionCompletedEvent executionCompletedEvent = (ExecutionCompletedEvent)historyEvent;
instanceEntity["RuntimeStatus"] = OrchestrationStatus.ContinuedAsNew.ToString();
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionCompletedEvent.Result),
instancePropertyName: OutputProperty,
data: executionCompletedEvent.Result);
break;
}
// Table storage only supports inserts of up to 100 entities at a time or 4 MB at a time.
if (historyEventBatch.Count == 99 || estimatedBytes > 3 * 1024 * 1024 /* 3 MB */)
{
eTagValue = await this.UploadHistoryBatch(
instanceId,
sanitizedInstanceId,
executionId,
historyEventBatch,
newEventListBuffer,
allEvents.Count,
episodeNumber,
estimatedBytes,
eTagValue,
isFinalBatch: isFinalEvent,
cancellationToken: cancellationToken);
// Reset local state for the next batch
newEventListBuffer.Clear();
historyEventBatch.Clear();
estimatedBytes = 0;
}
}
// First persistence step is to commit history to the history table. Messages must come after.
if (historyEventBatch.Count > 0)
{
eTagValue = await this.UploadHistoryBatch(
instanceId,
sanitizedInstanceId,
executionId,
historyEventBatch,
newEventListBuffer,
allEvents.Count,
episodeNumber,
estimatedBytes,
eTagValue,
isFinalBatch: true,
cancellationToken: cancellationToken);
}
Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);
this.settings.Logger.InstanceStatusUpdate(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
runtimeStatus,
episodeNumber,
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);
// finally, delete orphaned blobs from the previous execution history.
// We had to wait until the new history has committed to make sure the blobs are no longer necessary.
if (blobsToDelete != null)
{
var tasks = new List<Task>(blobsToDelete.Count);
foreach (var blobName in blobsToDelete)
{
tasks.Add(this.messageManager.DeleteBlobAsync(blobName));
}
await Task.WhenAll(tasks);
}
return eTagValue;
}