src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs (486 lines of code) (raw):
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
namespace DurableTask.AzureServiceFabric
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Fabric;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Tracking;
using DurableTask.AzureServiceFabric.Stores;
using DurableTask.AzureServiceFabric.TaskHelpers;
using DurableTask.AzureServiceFabric.Tracing;
using Microsoft.ServiceFabric.Data;
using Newtonsoft.Json;
class FabricOrchestrationService : IOrchestrationService
{
readonly IReliableStateManager stateManager;
readonly IFabricOrchestrationServiceInstanceStore instanceStore;
readonly SessionProvider orchestrationProvider;
readonly ActivityProvider activitiesProvider;
readonly ScheduledMessageProvider scheduledMessagesProvider;
readonly FabricOrchestrationProviderSettings settings;
readonly CancellationTokenSource cancellationTokenSource;
ConcurrentDictionary<string, SessionInformation> sessionInfos = new ConcurrentDictionary<string, SessionInformation>();
public FabricOrchestrationService(IReliableStateManager stateManager,
SessionProvider orchestrationProvider,
IFabricOrchestrationServiceInstanceStore instanceStore,
FabricOrchestrationProviderSettings settings,
CancellationTokenSource cancellationTokenSource)
{
this.stateManager = stateManager ?? throw new ArgumentNullException(nameof(stateManager));
this.orchestrationProvider = orchestrationProvider;
this.instanceStore = instanceStore;
this.settings = settings;
this.cancellationTokenSource = cancellationTokenSource;
this.activitiesProvider = new ActivityProvider(this.stateManager, Constants.ActivitiesQueueName, cancellationTokenSource.Token);
this.scheduledMessagesProvider = new ScheduledMessageProvider(this.stateManager, Constants.ScheduledMessagesDictionaryName, orchestrationProvider, cancellationTokenSource.Token);
}
public Task StartAsync()
{
return Task.WhenAll(this.activitiesProvider.StartAsync(),
this.scheduledMessagesProvider.StartAsync(),
this.instanceStore.StartAsync(),
this.orchestrationProvider.StartAsync());
}
public Task StopAsync()
{
return StopAsync(false);
}
public Task StopAsync(bool isForced)
{
if (!this.cancellationTokenSource.IsCancellationRequested)
{
this.cancellationTokenSource.Cancel();
}
return Task.CompletedTask;
}
public Task CreateAsync()
{
return CreateAsync(true);
}
public Task CreateAsync(bool recreateInstanceStore)
{
return DeleteAsync(deleteInstanceStore: recreateInstanceStore);
// Actual creation will be done on demand when we call GetOrAddAsync in StartAsync method.
}
public Task CreateIfNotExistsAsync()
{
return Task.CompletedTask;
}
public Task DeleteAsync()
{
return DeleteAsync(true);
}
public Task DeleteAsync(bool deleteInstanceStore)
{
List<Task> tasks = new List<Task>();
tasks.Add(this.stateManager.RemoveAsync(Constants.OrchestrationDictionaryName));
tasks.Add(this.stateManager.RemoveAsync(Constants.ScheduledMessagesDictionaryName));
tasks.Add(this.stateManager.RemoveAsync(Constants.ActivitiesQueueName));
if (deleteInstanceStore)
{
tasks.Add(this.stateManager.RemoveAsync(Constants.InstanceStoreDictionaryName));
}
return Task.WhenAll(tasks);
}
public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
{
return false;
}
public int GetDelayInSecondsAfterOnProcessException(Exception exception)
{
return GetDelayForFetchOrProcessException(exception);
}
public int GetDelayInSecondsAfterOnFetchException(Exception exception)
{
return GetDelayForFetchOrProcessException(exception);
}
public int TaskOrchestrationDispatcherCount => this.settings.TaskOrchestrationDispatcherSettings.DispatcherCount;
public int MaxConcurrentTaskOrchestrationWorkItems => this.settings.TaskOrchestrationDispatcherSettings.MaxConcurrentOrchestrations;
// Note: Do not rely on cancellationToken parameter to this method because the top layer does not yet implement any cancellation.
public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
var currentSession = await this.orchestrationProvider.AcceptSessionAsync(receiveTimeout);
if (currentSession == null)
{
return null;
}
List<Message<Guid, TaskMessageItem>> newMessages;
try
{
newMessages = await this.orchestrationProvider.ReceiveSessionMessagesAsync(currentSession);
var currentRuntimeState = new OrchestrationRuntimeState(currentSession.SessionState);
var workItem = new TaskOrchestrationWorkItem()
{
NewMessages = newMessages.Select(m => m.Value.TaskMessage).ToList(),
InstanceId = currentSession.SessionId.InstanceId,
OrchestrationRuntimeState = currentRuntimeState
};
if (newMessages.Count == 0)
{
if (currentRuntimeState.ExecutionStartedEvent == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"Orchestration with no execution started event found: {currentSession.SessionId}");
return null;
}
bool isComplete = this.IsOrchestrationComplete(currentRuntimeState.OrchestrationStatus);
if (isComplete)
{
await this.HandleCompletedOrchestrationAsync(workItem);
}
this.orchestrationProvider.TryUnlockSession(currentSession.SessionId, isComplete: isComplete);
return null;
}
var sessionInfo = new SessionInformation()
{
Instance = currentSession.SessionId,
LockTokens = newMessages.Select(m => m.Key).ToList()
};
if (!this.sessionInfos.TryAdd(workItem.InstanceId, sessionInfo))
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"{nameof(FabricOrchestrationService)}.{nameof(LockNextTaskOrchestrationWorkItemAsync)} : Multiple receivers processing the same session : {currentSession.SessionId.InstanceId}?");
}
return workItem;
}
catch (Exception)
{
this.orchestrationProvider.TryUnlockSession(currentSession.SessionId, abandon: true);
throw;
}
}
public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
{
return Task.CompletedTask;
}
public async Task CompleteTaskOrchestrationWorkItemAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
IList<TaskMessage> outboundMessages,
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
{
SessionInformation sessionInfo = GetSessionInfo(workItem.InstanceId);
bool isComplete = false;
try
{
var orchestrationStatus = workItem.OrchestrationRuntimeState.OrchestrationStatus;
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId,
$"Current orchestration status: {orchestrationStatus}");
isComplete = this.IsOrchestrationComplete(orchestrationStatus);
}
catch (InvalidOperationException ex)
{
// OrchestrationRuntimeState.OrchestrationStatus throws 'InvalidOperationException' if 'ExecutionStartedEvent' is missing.
// Do not process the orchestration workitem if 'ExecutionStartedEvent' is missing.
// This can happen when an orchestration message like ExecutionTerminatedEvent is sent to an already finished orchestration
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"InstanceId: '{workItem.InstanceId}', exception: {ex}. Dropping the bad orchestration to avoid noise.");
await this.DropOrchestrationAsync(workItem);
}
}
IList<OrchestrationInstance> sessionsToEnqueue = null;
List<Message<Guid, TaskMessageItem>> scheduledMessages = null;
List<Message<string, TaskMessageItem>> activityMessages = null;
await RetryHelper.ExecuteWithRetryOnTransient(async () =>
{
bool retryOnException;
do
{
try
{
retryOnException = false;
sessionsToEnqueue = null;
scheduledMessages = null;
activityMessages = null;
using (var txn = this.stateManager.CreateTransaction())
{
if (outboundMessages?.Count > 0)
{
activityMessages = outboundMessages.Select(m => new Message<string, TaskMessageItem>(Guid.NewGuid().ToString(), new TaskMessageItem(m))).ToList();
await this.activitiesProvider.SendBatchBeginAsync(txn, activityMessages);
}
if (timerMessages?.Count > 0)
{
scheduledMessages = timerMessages.Select(m => new Message<Guid, TaskMessageItem>(Guid.NewGuid(), new TaskMessageItem(m))).ToList();
await this.scheduledMessagesProvider.SendBatchBeginAsync(txn, scheduledMessages);
}
if (orchestratorMessages?.Count > 0)
{
// Commenting this code to allow nested (multi-level) suborchestrations
// If the suborchestration has orchestration messages then we need process them under suborchestration's
// session provider except for another suborchestration called from current suborchestraiton.
//if (workItem.OrchestrationRuntimeState?.ParentInstance != null)
//{
// sessionsToEnqueue = await this.orchestrationProvider.TryAppendMessageBatchAsync(txn, orchestratorMessages.Select(tm => new TaskMessageItem(tm)));
//}
//else
{
await this.orchestrationProvider.AppendMessageBatchAsync(txn, orchestratorMessages.Select(tm => new TaskMessageItem(tm)));
sessionsToEnqueue = orchestratorMessages.Select(m => m.OrchestrationInstance).ToList();
}
}
if (continuedAsNewMessage != null)
{
await this.orchestrationProvider.AppendMessageAsync(txn, new TaskMessageItem(continuedAsNewMessage));
sessionsToEnqueue = new List<OrchestrationInstance>() { continuedAsNewMessage.OrchestrationInstance };
}
await this.orchestrationProvider.CompleteMessages(txn, sessionInfo.Instance, sessionInfo.LockTokens);
if (workItem.OrchestrationRuntimeState.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
await HandleCompletedOrchestrationAsync(workItem);
}
// When an orchestration is completed, we need to drop the session which involves 2 steps (1) Removing the row from sessions
// (2) Dropping the session messages dictionary. The second step is done in background thread for performance so is not
// part of transaction. Since it will happen outside the trasanction, if this transaction fails for some reason and we dropped
// the session as part of this transaction, we wouldn't have updated the session state but would have lost the messages
// in the session messages dictionary which are needed for state to reach complete state (when the orchestration is picked up again in next fetch).
// So we don't want to drop session as part of this transaction.
// Instead, we drop the session as part of a subsequent different transaction.
// However, framework passes us 'null' value for 'newOrchestrationRuntimeState' when orchestration is completed and
// if we updated the session state to null and this transaction succeded, and a node failures occurs and we
// never call the subsequent transaction, we will lose the runtime state of orchestration and never will be able to
// mark it as complete even if it is. So we use the work item's runtime state when 'newOrchestrationRuntimeState' is null
// so that the latest state is what is stored for the session.
// As part of next transaction, we are going to remove the row anyway for the session and it doesn't matter to update it to 'null'.
await this.orchestrationProvider.UpdateSessionState(txn, newOrchestrationRuntimeState.OrchestrationInstance, newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState);
// We skip writing to instanceStore when orchestration reached terminal state to avoid a minor timing issue that
// wait for an orchestration completes but another orchestration with the same name cannot be started immediately
// because the session is still in store. We update the instance store on orchestration completion and drop the
// session as part of the next atomic transaction.
if (this.instanceStore != null && orchestrationState != null && !isComplete)
{
await this.instanceStore.WriteEntitiesAsync(txn, new InstanceEntityBase[]
{
new OrchestrationStateInstanceEntity()
{
State = orchestrationState
}
});
}
await txn.CommitAsync();
}
}
catch (FabricReplicationOperationTooLargeException ex)
{
ServiceFabricProviderEventSource.Tracing.ExceptionInReliableCollectionOperations($"OrchestrationInstance = {sessionInfo.Instance}, Action = {nameof(CompleteTaskOrchestrationWorkItemAsync)}", ex.ToString());
retryOnException = true;
newOrchestrationRuntimeState = null;
outboundMessages = null;
timerMessages = null;
orchestratorMessages = null;
if (orchestrationState != null)
{
orchestrationState.OrchestrationStatus = OrchestrationStatus.Failed;
orchestrationState.Output = $"Fabric exception when trying to process orchestration: {ex}. Investigate and consider reducing the serialization size of orchestration inputs/outputs/overall length to avoid the issue.";
}
}
} while (retryOnException);
}, uniqueActionIdentifier: $"OrchestrationId = '{workItem.InstanceId}', Action = '{nameof(CompleteTaskOrchestrationWorkItemAsync)}'");
if (activityMessages != null)
{
this.activitiesProvider.SendBatchComplete(activityMessages);
}
if (scheduledMessages != null)
{
this.scheduledMessagesProvider.SendBatchComplete(scheduledMessages);
}
if (sessionsToEnqueue != null)
{
foreach (var instance in sessionsToEnqueue)
{
this.orchestrationProvider.TryEnqueueSession(instance);
}
}
if (isComplete)
{
await HandleCompletedOrchestrationAsync(workItem);
}
}
async Task DropOrchestrationAsync(TaskOrchestrationWorkItem workItem)
{
await CompleteOrchestrationAsync(workItem);
string message = $"{nameof(DropOrchestrationAsync)}: Dropped. Orchestration history: {JsonConvert.SerializeObject(workItem.OrchestrationRuntimeState.Events)}";
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId,
message);
}
// Caller should ensure the workItem has reached terminal state.
async Task HandleCompletedOrchestrationAsync(TaskOrchestrationWorkItem workItem)
{
await CompleteOrchestrationAsync(workItem);
string message = string.Format("Orchestration with instanceId : '{0}' and executionId : '{1}' Finished with the status {2} and result {3} in {4} seconds.",
workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId,
workItem.OrchestrationRuntimeState.OrchestrationStatus.ToString(),
workItem.OrchestrationRuntimeState.Output,
(workItem.OrchestrationRuntimeState.CompletedTime - workItem.OrchestrationRuntimeState.CreatedTime).TotalSeconds);
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId,
message);
}
async Task CompleteOrchestrationAsync(TaskOrchestrationWorkItem workItem)
{
await RetryHelper.ExecuteWithRetryOnTransient(async () =>
{
using (var txn = this.stateManager.CreateTransaction())
{
await this.instanceStore.WriteEntitiesAsync(txn, new InstanceEntityBase[]
{
new OrchestrationStateInstanceEntity()
{
State = Utils.BuildOrchestrationState(workItem)
}
});
var instance = workItem.OrchestrationRuntimeState.OrchestrationInstance;
if (instance == null)
{
// This condition happens when an orchestration message like ExecutionTerminatedEvent enqueued for an already completed orchestration
SessionInformation sessionInfo = this.GetSessionInfo(workItem.InstanceId);
instance = sessionInfo.Instance;
}
// DropSession does 2 things (like mentioned in the comments above) - remove the row from sessions dictionary
// and delete the session messages dictionary. The second step is in a background thread and not part of transaction.
// However even if this transaction failed but we ended up deleting session messages dictionary, that's ok - at
// that time, it should be an empty dictionary and we would have updated the runtime session state to full completed
// state in the transaction from Complete method. So the subsequent attempt would be able to complete the session.
await this.orchestrationProvider.DropSessionAsync(txn, instance);
await txn.CommitAsync();
}
}, uniqueActionIdentifier: $"OrchestrationId = '{workItem.InstanceId}', Action = '{nameof(CompleteOrchestrationAsync)}'");
this.instanceStore.OnOrchestrationCompleted(workItem.OrchestrationRuntimeState.OrchestrationInstance);
}
public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
SessionInformation sessionInfo = this.TryRemoveSessionInfo(workItem.InstanceId);
if (sessionInfo == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"{nameof(AbandonTaskOrchestrationWorkItemAsync)} : Could not get a session info object while trying to abandon session {workItem.InstanceId}");
}
else
{
this.orchestrationProvider.TryUnlockSession(sessionInfo.Instance, abandon: true);
}
return Task.CompletedTask;
}
public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
bool isComplete = false;
try
{
isComplete = this.IsOrchestrationComplete(workItem.OrchestrationRuntimeState.OrchestrationStatus);
}
catch (InvalidOperationException ex)
{
// OrchestrationRuntimeState.OrchestrationStatus throws 'InvalidOperationException' if 'ExecutionStartedEvent' is missing.
// This can happen when an orchestration message like ExecutionTerminatedEvent is sent to an already finished orchestration
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"InstanceId: '{workItem.InstanceId}', exception: {ex}. Dropping/Unlocking the session as completed.");
isComplete = true;
}
}
SessionInformation sessionInfo = this.TryRemoveSessionInfo(workItem.InstanceId);
if (sessionInfo != null)
{
this.orchestrationProvider.TryUnlockSession(sessionInfo.Instance, isComplete: isComplete);
}
return Task.CompletedTask;
}
public int TaskActivityDispatcherCount => this.settings.TaskActivityDispatcherSettings.DispatcherCount;
public int MaxConcurrentTaskActivityWorkItems => this.settings.TaskActivityDispatcherSettings.MaxConcurrentActivities;
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } = BehaviorOnContinueAsNew.Ignore;
// Note: Do not rely on cancellationToken parameter to this method because the top layer does not yet implement any cancellation.
public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
var message = await this.activitiesProvider.ReceiveAsync(receiveTimeout);
if (message != null)
{
return new TaskActivityWorkItem()
{
Id = message.Key,
TaskMessage = message.Value.TaskMessage
};
}
return null;
}
public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
{
bool added = false;
await RetryHelper.ExecuteWithRetryOnTransient(async () =>
{
bool retryOnException;
do
{
try
{
added = false;
retryOnException = false;
using (var txn = this.stateManager.CreateTransaction())
{
await this.activitiesProvider.CompleteAsync(txn, workItem.Id);
added = await this.orchestrationProvider.TryAppendMessageAsync(txn, new TaskMessageItem(responseMessage));
await txn.CommitAsync();
}
}
catch (FabricReplicationOperationTooLargeException ex)
{
ServiceFabricProviderEventSource.Tracing.ExceptionInReliableCollectionOperations($"OrchestrationInstance = {responseMessage.OrchestrationInstance}, ActivityId = {workItem.Id}, Action = {nameof(CompleteTaskActivityWorkItemAsync)}", ex.ToString());
retryOnException = true;
var originalEvent = responseMessage.Event;
int taskScheduledId = GetTaskScheduledId(originalEvent);
string details = $"Fabric exception when trying to save activity result: {ex}. Consider reducing the serialization size of activity result to avoid the issue.";
responseMessage.Event = new TaskFailedEvent(originalEvent.EventId, taskScheduledId, ex.Message, details);
}
} while (retryOnException);
}, uniqueActionIdentifier: $"Orchestration = '{responseMessage.OrchestrationInstance}', ActivityId = '{workItem.Id}', Action = '{nameof(CompleteTaskActivityWorkItemAsync)}'");
if (added)
{
this.orchestrationProvider.TryEnqueueSession(responseMessage.OrchestrationInstance);
}
}
public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
{
bool removed = false;
using (var txn = this.stateManager.CreateTransaction())
{
// Remove task activity if orchestration was already terminated or cleaned up
if (!await this.orchestrationProvider.ContainsSessionAsync(txn, workItem.TaskMessage.OrchestrationInstance))
{
var errorMessage = $"Session doesn't exist. Dropping TaskActivity for Orchestration = '{workItem.TaskMessage.OrchestrationInstance}', ActivityId = '{workItem.Id}', Action = '{nameof(AbandonTaskActivityWorkItemAsync)}'";
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition(errorMessage);
await this.activitiesProvider.CompleteAsync(txn, workItem.Id);
removed = true;
}
await txn.CommitAsync();
}
if (!removed)
{
// Re-Enqueue task activity
this.activitiesProvider.Abandon(workItem.Id);
}
}
public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
{
return Task.FromResult(workItem);
}
int GetTaskScheduledId(HistoryEvent historyEvent)
{
TaskCompletedEvent tce = historyEvent as TaskCompletedEvent;
if (tce != null)
{
return tce.TaskScheduledId;
}
TaskFailedEvent tfe = historyEvent as TaskFailedEvent;
if (tfe != null)
{
return tfe.TaskScheduledId;
}
return -1;
}
int GetDelayForFetchOrProcessException(Exception exception)
{
//Todo: Need to fine tune
if (exception is TimeoutException)
{
return 1;
}
if (exception is FabricNotReadableException)
{
return 2;
}
return 0;
}
bool IsOrchestrationComplete(OrchestrationStatus status)
{
return !(status.IsRunningOrPending() || status == OrchestrationStatus.ContinuedAsNew);
}
SessionInformation GetSessionInfo(string sessionId)
{
ServiceFabricProviderEventSource.Tracing.TraceMessage(sessionId, $"{nameof(GetSessionInfo)} - Getting session info");
if (!this.sessionInfos.TryGetValue(sessionId, out SessionInformation sessionInfo))
{
var message = $"{nameof(GetSessionInfo)}. Trying to get a session that's not in locked sessions {sessionId}";
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition(message);
throw new Exception(message);
}
return sessionInfo;
}
SessionInformation TryRemoveSessionInfo(string sessionId)
{
var removed = this.sessionInfos.TryRemove(sessionId, out SessionInformation sessionInfo);
ServiceFabricProviderEventSource.Tracing.TraceMessage(sessionId, $"{nameof(TryRemoveSessionInfo)}: Removed = {removed}");
return sessionInfo;
}
class SessionInformation
{
public OrchestrationInstance Instance { get; set; }
public List<Guid> LockTokens { get; set; }
}
}
}