in src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs [1164:1249]
async Task<ETag?> UploadHistoryBatch(
string instanceId,
string sanitizedInstanceId,
string executionId,
IList<TableTransactionAction> historyEventBatch,
StringBuilder historyEventNamesBuffer,
int numberOfTotalEvents,
int episodeNumber,
int estimatedBatchSizeInBytes,
ETag? eTagValue,
bool isFinalBatch,
CancellationToken cancellationToken)
{
// Adding / updating sentinel entity
TableEntity sentinelEntity = new TableEntity(sanitizedInstanceId, SentinelRowKey)
{
["ExecutionId"] = executionId,
[IsCheckpointCompleteProperty] = isFinalBatch,
};
if (isFinalBatch)
{
sentinelEntity[CheckpointCompletedTimestampProperty] = DateTime.UtcNow;
}
if (eTagValue != null)
{
historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.UpdateMerge, sentinelEntity, eTagValue.GetValueOrDefault()));
}
else
{
historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.Add, sentinelEntity));
}
TableTransactionResults resultInfo;
Stopwatch stopwatch = Stopwatch.StartNew();
try
{
resultInfo = await this.HistoryTable.ExecuteBatchAsync(historyEventBatch, cancellationToken);
}
catch (DurableTaskStorageException ex)
{
if (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
{
this.settings.Logger.SplitBrainDetected(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
historyEventBatch.Count - 1, // exclude sentinel from count
numberOfTotalEvents,
historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma
stopwatch.ElapsedMilliseconds,
eTagValue?.ToString());
}
throw;
}
IReadOnlyList<Response> responses = resultInfo.Responses;
ETag? newETagValue = null;
for (int i = responses.Count - 1; i >= 0; i--)
{
if (historyEventBatch[i].Entity.RowKey == SentinelRowKey)
{
newETagValue = responses[i].Headers.ETag;
break;
}
}
this.settings.Logger.AppendedInstanceHistory(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
historyEventBatch.Count - 1, // exclude sentinel from count
numberOfTotalEvents,
historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma
episodeNumber,
resultInfo.ElapsedMilliseconds,
estimatedBatchSizeInBytes,
string.Concat(eTagValue?.ToString() ?? "(null)", " --> ", newETagValue?.ToString() ?? "(null)"),
isFinalBatch);
return newETagValue;
}