in src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs [1456:1520]
async Task ProcessTrackingWorkItemAsync(TrackingWorkItem workItem)
{
var sessionState = workItem.SessionInstance as ServiceBusOrchestrationSession;
if (sessionState == null)
{
throw new ArgumentNullException(nameof(workItem.SessionInstance));
}
var historyEntities = new List<OrchestrationWorkItemInstanceEntity>();
var stateEntities = new List<OrchestrationStateInstanceEntity>();
foreach (TaskMessage taskMessage in workItem.NewMessages)
{
if (taskMessage.Event.EventType == EventType.HistoryState)
{
stateEntities.Add(new OrchestrationStateInstanceEntity
{
State = (taskMessage.Event as HistoryStateEvent)?.State,
SequenceNumber = taskMessage.SequenceNumber
});
}
else
{
historyEntities.Add(new OrchestrationWorkItemInstanceEntity
{
InstanceId = taskMessage.OrchestrationInstance.InstanceId,
ExecutionId = taskMessage.OrchestrationInstance.ExecutionId,
SequenceNumber = taskMessage.SequenceNumber,
EventTimestamp = DateTime.UtcNow,
HistoryEvent = taskMessage.Event
});
}
}
TraceEntities(TraceEventType.Verbose, "Writing tracking history event", historyEntities, GetNormalizedWorkItemEvent);
TraceEntities(TraceEventType.Verbose, "Writing tracking state event", stateEntities, GetNormalizedStateEvent);
try
{
await this.InstanceStore.WriteEntitiesAsync(historyEntities);
}
catch (Exception e) when (!Utils.IsFatal(e))
{
TraceEntities(TraceEventType.Critical, $"Failed to write history entity: {e}", historyEntities, GetNormalizedWorkItemEvent);
throw;
}
try
{
// TODO : send batch to instance store, it can write it as individual if it chooses
foreach (OrchestrationStateInstanceEntity stateEntity in stateEntities)
{
await this.InstanceStore.WriteEntitiesAsync(new List<OrchestrationStateInstanceEntity> { stateEntity });
}
}
catch (Exception e) when (!Utils.IsFatal(e))
{
TraceEntities(TraceEventType.Critical, $"Failed to write state entity: {e}", stateEntities, GetNormalizedStateEvent);
throw;
}
// Cleanup our session
await sessionState.Session.CompleteAsync(sessionState.LockTokens.Values);
await sessionState.Session.CloseAsync();
}