async Task UploadHistoryBatch()

in src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs [1164:1249]


        async Task<ETag?> UploadHistoryBatch(
            string instanceId,
            string sanitizedInstanceId,
            string executionId,
            IList<TableTransactionAction> historyEventBatch,
            StringBuilder historyEventNamesBuffer,
            int numberOfTotalEvents,
            int episodeNumber,
            int estimatedBatchSizeInBytes,
            ETag? eTagValue,
            bool isFinalBatch,
            CancellationToken cancellationToken)
        {
            // Adding / updating sentinel entity
            TableEntity sentinelEntity = new TableEntity(sanitizedInstanceId, SentinelRowKey)
            {
                ["ExecutionId"] = executionId,
                [IsCheckpointCompleteProperty] = isFinalBatch,
            };

            if (isFinalBatch)
            {
                sentinelEntity[CheckpointCompletedTimestampProperty] = DateTime.UtcNow;
            }

            if (eTagValue != null)
            {
                historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.UpdateMerge, sentinelEntity, eTagValue.GetValueOrDefault()));
            }
            else
            {
                historyEventBatch.Add(new TableTransactionAction(TableTransactionActionType.Add, sentinelEntity));
            }

            TableTransactionResults resultInfo;
            Stopwatch stopwatch = Stopwatch.StartNew();
            try
            {
                resultInfo = await this.HistoryTable.ExecuteBatchAsync(historyEventBatch, cancellationToken);
            }
            catch (DurableTaskStorageException ex)
            {
                if (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
                {
                    this.settings.Logger.SplitBrainDetected(
                        this.storageAccountName,
                        this.taskHubName,
                        instanceId,
                        executionId,
                        historyEventBatch.Count - 1, // exclude sentinel from count
                        numberOfTotalEvents,
                        historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma
                        stopwatch.ElapsedMilliseconds,
                        eTagValue?.ToString());
                }

                throw;
            }

            IReadOnlyList<Response> responses = resultInfo.Responses;
            ETag? newETagValue = null;
            for (int i = responses.Count - 1; i >= 0; i--)
            {
                if (historyEventBatch[i].Entity.RowKey == SentinelRowKey)
                {
                    newETagValue = responses[i].Headers.ETag;
                    break;
                }
            }

            this.settings.Logger.AppendedInstanceHistory(
                this.storageAccountName,
                this.taskHubName,
                instanceId,
                executionId,
                historyEventBatch.Count - 1, // exclude sentinel from count
                numberOfTotalEvents,
                historyEventNamesBuffer.ToString(0, historyEventNamesBuffer.Length - 1), // remove trailing comma
                episodeNumber,
                resultInfo.ElapsedMilliseconds,
                estimatedBatchSizeInBytes,
                string.Concat(eTagValue?.ToString() ?? "(null)", " --> ", newETagValue?.ToString() ?? "(null)"),
                isFinalBatch);

            return newETagValue;
        }