in src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs [204:366]
public async Task CompleteTaskOrchestrationWorkItemAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
IList<TaskMessage> outboundMessages,
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
{
SessionInformation sessionInfo = GetSessionInfo(workItem.InstanceId);
bool isComplete = false;
try
{
var orchestrationStatus = workItem.OrchestrationRuntimeState.OrchestrationStatus;
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId,
$"Current orchestration status: {orchestrationStatus}");
isComplete = this.IsOrchestrationComplete(orchestrationStatus);
}
catch (InvalidOperationException ex)
{
// OrchestrationRuntimeState.OrchestrationStatus throws 'InvalidOperationException' if 'ExecutionStartedEvent' is missing.
// Do not process the orchestration workitem if 'ExecutionStartedEvent' is missing.
// This can happen when an orchestration message like ExecutionTerminatedEvent is sent to an already finished orchestration
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"InstanceId: '{workItem.InstanceId}', exception: {ex}. Dropping the bad orchestration to avoid noise.");
await this.DropOrchestrationAsync(workItem);
}
}
IList<OrchestrationInstance> sessionsToEnqueue = null;
List<Message<Guid, TaskMessageItem>> scheduledMessages = null;
List<Message<string, TaskMessageItem>> activityMessages = null;
await RetryHelper.ExecuteWithRetryOnTransient(async () =>
{
bool retryOnException;
do
{
try
{
retryOnException = false;
sessionsToEnqueue = null;
scheduledMessages = null;
activityMessages = null;
using (var txn = this.stateManager.CreateTransaction())
{
if (outboundMessages?.Count > 0)
{
activityMessages = outboundMessages.Select(m => new Message<string, TaskMessageItem>(Guid.NewGuid().ToString(), new TaskMessageItem(m))).ToList();
await this.activitiesProvider.SendBatchBeginAsync(txn, activityMessages);
}
if (timerMessages?.Count > 0)
{
scheduledMessages = timerMessages.Select(m => new Message<Guid, TaskMessageItem>(Guid.NewGuid(), new TaskMessageItem(m))).ToList();
await this.scheduledMessagesProvider.SendBatchBeginAsync(txn, scheduledMessages);
}
if (orchestratorMessages?.Count > 0)
{
// Commenting this code to allow nested (multi-level) suborchestrations
// If the suborchestration has orchestration messages then we need process them under suborchestration's
// session provider except for another suborchestration called from current suborchestraiton.
//if (workItem.OrchestrationRuntimeState?.ParentInstance != null)
//{
// sessionsToEnqueue = await this.orchestrationProvider.TryAppendMessageBatchAsync(txn, orchestratorMessages.Select(tm => new TaskMessageItem(tm)));
//}
//else
{
await this.orchestrationProvider.AppendMessageBatchAsync(txn, orchestratorMessages.Select(tm => new TaskMessageItem(tm)));
sessionsToEnqueue = orchestratorMessages.Select(m => m.OrchestrationInstance).ToList();
}
}
if (continuedAsNewMessage != null)
{
await this.orchestrationProvider.AppendMessageAsync(txn, new TaskMessageItem(continuedAsNewMessage));
sessionsToEnqueue = new List<OrchestrationInstance>() { continuedAsNewMessage.OrchestrationInstance };
}
await this.orchestrationProvider.CompleteMessages(txn, sessionInfo.Instance, sessionInfo.LockTokens);
if (workItem.OrchestrationRuntimeState.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
await HandleCompletedOrchestrationAsync(workItem);
}
// When an orchestration is completed, we need to drop the session which involves 2 steps (1) Removing the row from sessions
// (2) Dropping the session messages dictionary. The second step is done in background thread for performance so is not
// part of transaction. Since it will happen outside the trasanction, if this transaction fails for some reason and we dropped
// the session as part of this transaction, we wouldn't have updated the session state but would have lost the messages
// in the session messages dictionary which are needed for state to reach complete state (when the orchestration is picked up again in next fetch).
// So we don't want to drop session as part of this transaction.
// Instead, we drop the session as part of a subsequent different transaction.
// However, framework passes us 'null' value for 'newOrchestrationRuntimeState' when orchestration is completed and
// if we updated the session state to null and this transaction succeded, and a node failures occurs and we
// never call the subsequent transaction, we will lose the runtime state of orchestration and never will be able to
// mark it as complete even if it is. So we use the work item's runtime state when 'newOrchestrationRuntimeState' is null
// so that the latest state is what is stored for the session.
// As part of next transaction, we are going to remove the row anyway for the session and it doesn't matter to update it to 'null'.
await this.orchestrationProvider.UpdateSessionState(txn, newOrchestrationRuntimeState.OrchestrationInstance, newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState);
// We skip writing to instanceStore when orchestration reached terminal state to avoid a minor timing issue that
// wait for an orchestration completes but another orchestration with the same name cannot be started immediately
// because the session is still in store. We update the instance store on orchestration completion and drop the
// session as part of the next atomic transaction.
if (this.instanceStore != null && orchestrationState != null && !isComplete)
{
await this.instanceStore.WriteEntitiesAsync(txn, new InstanceEntityBase[]
{
new OrchestrationStateInstanceEntity()
{
State = orchestrationState
}
});
}
await txn.CommitAsync();
}
}
catch (FabricReplicationOperationTooLargeException ex)
{
ServiceFabricProviderEventSource.Tracing.ExceptionInReliableCollectionOperations($"OrchestrationInstance = {sessionInfo.Instance}, Action = {nameof(CompleteTaskOrchestrationWorkItemAsync)}", ex.ToString());
retryOnException = true;
newOrchestrationRuntimeState = null;
outboundMessages = null;
timerMessages = null;
orchestratorMessages = null;
if (orchestrationState != null)
{
orchestrationState.OrchestrationStatus = OrchestrationStatus.Failed;
orchestrationState.Output = $"Fabric exception when trying to process orchestration: {ex}. Investigate and consider reducing the serialization size of orchestration inputs/outputs/overall length to avoid the issue.";
}
}
} while (retryOnException);
}, uniqueActionIdentifier: $"OrchestrationId = '{workItem.InstanceId}', Action = '{nameof(CompleteTaskOrchestrationWorkItemAsync)}'");
if (activityMessages != null)
{
this.activitiesProvider.SendBatchComplete(activityMessages);
}
if (scheduledMessages != null)
{
this.scheduledMessagesProvider.SendBatchComplete(scheduledMessages);
}
if (sessionsToEnqueue != null)
{
foreach (var instance in sessionsToEnqueue)
{
this.orchestrationProvider.TryEnqueueSession(instance);
}
}
if (isComplete)
{
await HandleCompletedOrchestrationAsync(workItem);
}
}