in src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs [152:232]
public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string instanceId, string expectedExecutionId, CancellationToken cancellationToken = default)
{
TableQueryResults<TableEntity> results = await this
.GetHistoryEntitiesResponseInfoAsync(instanceId, expectedExecutionId, null, cancellationToken)
.GetResultsAsync(cancellationToken: cancellationToken);
// The sentinel row should always be the last row
TableEntity sentinel = results.Entities.LastOrDefault(e => e.RowKey == SentinelRowKey);
IList<HistoryEvent> historyEvents;
string executionId;
TrackingStoreContext trackingStoreContext = new TrackingStoreContext();
// If expectedExecutionId is provided but it does not match the sentinel executionId,
// it may belong to a previous generation. In that case, treat it as an unknown executionId
// and skip loading history.
if (results.Entities.Count > 0 && (expectedExecutionId == null ||
expectedExecutionId == sentinel?.GetString("ExecutionId")))
{
// The most recent generation will always be in the first history event.
executionId = sentinel?.GetString("ExecutionId") ?? results.Entities[0].GetString("ExecutionId");
// Convert the table entities into history events.
var events = new List<HistoryEvent>(results.Entities.Count);
foreach (TableEntity entity in results.Entities)
{
if (entity.GetString("ExecutionId") != executionId)
{
// The remaining entities are from a previous generation and can be discarded.
break;
}
// The sentinel row does not contain any history events, so ignore and continue
if (entity == sentinel)
{
continue;
}
// Some entity properties may be stored in blob storage.
await this.DecompressLargeEntityProperties(entity, trackingStoreContext.Blobs, cancellationToken);
events.Add((HistoryEvent)TableEntityConverter.Deserialize(entity, GetTypeForTableEntity(entity)));
}
historyEvents = events;
}
else
{
historyEvents = Array.Empty<HistoryEvent>();
executionId = expectedExecutionId;
}
// Read the checkpoint completion time from the sentinel row.
// A sentinel won't exist only if no instance of this ID has ever existed or the instance history
// was purged. The IsCheckpointCompleteProperty was newly added _after_ v1.6.4.
DateTime checkpointCompletionTime = DateTime.MinValue;
ETag? eTagValue = sentinel?.ETag;
if (sentinel != null &&
sentinel.TryGetValue(CheckpointCompletedTimestampProperty, out object timestampObj) &&
timestampObj is DateTimeOffset timestampProperty)
{
checkpointCompletionTime = timestampProperty.DateTime;
}
int currentEpisodeNumber = Utils.GetEpisodeNumber(historyEvents);
this.settings.Logger.FetchedInstanceHistory(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
historyEvents.Count,
currentEpisodeNumber,
results.RequestCount,
results.ElapsedMilliseconds,
eTagValue?.ToString(),
checkpointCompletionTime);
return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreContext);
}