src/DurableTask.Emulator/LocalOrchestrationService.cs (500 lines of code) (raw):
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
namespace DurableTask.Emulator
{
using DurableTask.Core;
using DurableTask.Core.Common;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Entities;
/// <summary>
/// Fully functional in-proc orchestration service for testing
/// </summary>
public class LocalOrchestrationService : IOrchestrationService, IOrchestrationServiceClient, IEntityOrchestrationService, IDisposable
{
// ReSharper disable once NotAccessedField.Local
Dictionary<string, byte[]> sessionState;
readonly List<TaskMessage> timerMessages;
readonly int MaxConcurrentWorkItems = 20;
// dictionary<instanceId, dictionary<executionId, orchestrationState>>
////Dictionary<string, Dictionary<string, OrchestrationState>> instanceStore;
readonly PeekLockSessionQueue orchestratorQueue;
readonly PeekLockQueue workerQueue;
readonly CancellationTokenSource cancellationTokenSource;
readonly Dictionary<string, Dictionary<string, OrchestrationState>> instanceStore;
//Dictionary<string, Tuple<List<TaskMessage>, byte[]>> sessionLock;
readonly object thisLock = new object();
readonly object timerLock = new object();
readonly ConcurrentDictionary<string, TaskCompletionSource<OrchestrationState>> orchestrationWaiters;
static readonly JsonSerializerSettings StateJsonSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto };
/// <summary>
/// Creates a new instance of the LocalOrchestrationService with default settings
/// </summary>
public LocalOrchestrationService()
{
this.orchestratorQueue = new PeekLockSessionQueue();
this.workerQueue = new PeekLockQueue();
this.sessionState = new Dictionary<string, byte[]>();
this.timerMessages = new List<TaskMessage>();
this.instanceStore = new Dictionary<string, Dictionary<string, OrchestrationState>>();
this.orchestrationWaiters = new ConcurrentDictionary<string, TaskCompletionSource<OrchestrationState>>();
this.cancellationTokenSource = new CancellationTokenSource();
}
async Task TimerMessageSchedulerAsync()
{
while (!this.cancellationTokenSource.Token.IsCancellationRequested)
{
lock (this.timerLock)
{
foreach (TaskMessage tm in this.timerMessages.ToList())
{
var te = tm.Event as TimerFiredEvent;
if (te == null)
{
// TODO : unobserved task exception (AFFANDAR)
throw new InvalidOperationException("Invalid timer message");
}
if (te.FireAt <= DateTime.UtcNow)
{
this.orchestratorQueue.SendMessage(tm);
this.timerMessages.Remove(tm);
}
}
}
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
/******************************/
// management methods
/******************************/
/// <inheritdoc />
public Task CreateAsync()
{
return CreateAsync(true);
}
/// <inheritdoc />
public Task CreateAsync(bool recreateInstanceStore)
{
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task CreateIfNotExistsAsync()
{
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task DeleteAsync()
{
return DeleteAsync(true);
}
/// <inheritdoc />
public Task DeleteAsync(bool deleteInstanceStore)
{
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task StartAsync()
{
Task.Run(() => TimerMessageSchedulerAsync());
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task StopAsync(bool isForced)
{
this.cancellationTokenSource.Cancel();
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task StopAsync()
{
return StopAsync(false);
}
/// <summary>
/// Determines whether is a transient or not.
/// </summary>
/// <param name="exception">The exception.</param>
/// <returns>
/// <c>true</c> if is transient exception; otherwise, <c>false</c>.
/// </returns>
public bool IsTransientException(Exception exception)
{
return false;
}
/******************************/
// client methods
/******************************/
/// <inheritdoc />
public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
{
return CreateTaskOrchestrationAsync(creationMessage, null);
}
/// <inheritdoc />
public virtual Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
{
var ee = creationMessage.Event as ExecutionStartedEvent;
if (ee == null)
{
throw new InvalidOperationException("Invalid creation task message");
}
lock (this.thisLock)
{
if (!this.instanceStore.TryGetValue(creationMessage.OrchestrationInstance.InstanceId, out Dictionary<string, OrchestrationState> ed))
{
ed = new Dictionary<string, OrchestrationState>();
this.instanceStore[creationMessage.OrchestrationInstance.InstanceId] = ed;
}
OrchestrationState latestState = ed.Values.OrderBy(state => state.CreatedTime).FirstOrDefault(state => state.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew);
if (latestState != null && (dedupeStatuses == null || dedupeStatuses.Contains(latestState.OrchestrationStatus)))
{
// An orchestration with same instance id is already running
throw new OrchestrationAlreadyExistsException($"An orchestration with id '{creationMessage.OrchestrationInstance.InstanceId}' already exists. It is in state {latestState.OrchestrationStatus}");
}
var newState = new OrchestrationState
{
OrchestrationInstance = new OrchestrationInstance
{
InstanceId = creationMessage.OrchestrationInstance.InstanceId,
ExecutionId = creationMessage.OrchestrationInstance.ExecutionId,
},
CreatedTime = DateTime.UtcNow,
LastUpdatedTime = DateTime.UtcNow,
OrchestrationStatus = OrchestrationStatus.Pending,
Version = ee.Version,
Name = ee.Name,
Input = ee.Input,
ScheduledStartTime = ee.ScheduledStartTime,
Tags = ee.Tags,
};
ed.Add(creationMessage.OrchestrationInstance.ExecutionId, newState);
this.orchestratorQueue.SendMessage(creationMessage);
}
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
{
return SendTaskOrchestrationMessageBatchAsync(message);
}
/// <inheritdoc />
public Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
{
foreach (TaskMessage message in messages)
{
this.orchestratorQueue.SendMessage(message);
}
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public async Task<OrchestrationState> WaitForOrchestrationAsync(
string instanceId,
string executionId,
TimeSpan timeout,
CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(executionId))
{
executionId = string.Empty;
}
string key = instanceId + "_" + executionId;
if (!this.orchestrationWaiters.TryGetValue(key, out TaskCompletionSource<OrchestrationState> tcs))
{
tcs = new TaskCompletionSource<OrchestrationState>();
if (!this.orchestrationWaiters.TryAdd(key, tcs))
{
this.orchestrationWaiters.TryGetValue(key, out tcs);
}
if (tcs == null)
{
throw new InvalidOperationException("Unable to get tcs from orchestrationWaiters");
}
}
// might have finished already
lock (this.thisLock)
{
if (this.instanceStore.ContainsKey(instanceId))
{
Dictionary<string, OrchestrationState> stateMap = this.instanceStore[instanceId];
if (stateMap != null && stateMap.Count > 0)
{
OrchestrationState state = null;
if (string.IsNullOrWhiteSpace(executionId))
{
IOrderedEnumerable<OrchestrationState> sortedStateMap = stateMap.Values.OrderByDescending(os => os.CreatedTime);
state = sortedStateMap.First();
}
else
{
if (stateMap.ContainsKey(executionId))
{
state = this.instanceStore[instanceId][executionId];
}
}
if (state != null
&& state.OrchestrationStatus != OrchestrationStatus.Running
&& state.OrchestrationStatus != OrchestrationStatus.Pending)
{
// if only master id was specified then continueAsNew is a not a terminal state
if (!(string.IsNullOrWhiteSpace(executionId) && state.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew))
{
tcs.TrySetResult(state);
}
}
}
}
}
CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken,
this.cancellationTokenSource.Token);
Task timeOutTask = Task.Delay(timeout, cts.Token);
Task ret = await Task.WhenAny(tcs.Task, timeOutTask);
if (ret == timeOutTask)
{
throw new TimeoutException("timed out or canceled while waiting for orchestration to complete");
}
cts.Cancel();
return await tcs.Task;
}
/// <inheritdoc />
public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
{
OrchestrationState response;
lock (this.thisLock)
{
if (!(this.instanceStore.TryGetValue(instanceId, out Dictionary<string, OrchestrationState> state) &&
state.TryGetValue(executionId, out response)))
{
response = null;
}
}
return await Task.FromResult(response);
}
/// <inheritdoc />
public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
{
IList<OrchestrationState> response;
lock (this.thisLock)
{
if (this.instanceStore.TryGetValue(instanceId, out Dictionary<string, OrchestrationState> state))
{
response = state.Values.ToList();
}
else
{
response = new List<OrchestrationState>();
}
}
return await Task.FromResult(response);
}
/// <inheritdoc />
public Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
throw new NotSupportedException();
}
/******************************/
// Task orchestration methods
/******************************/
/// <inheritdoc />
public int MaxConcurrentTaskOrchestrationWorkItems => this.MaxConcurrentWorkItems;
/// <inheritdoc />
public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
TaskSession taskSession = await this.orchestratorQueue.AcceptSessionAsync(receiveTimeout,
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.cancellationTokenSource.Token).Token);
if (taskSession == null)
{
return null;
}
var wi = new TaskOrchestrationWorkItem
{
NewMessages = taskSession.Messages.ToList(),
InstanceId = taskSession.Id,
LockedUntilUtc = DateTime.UtcNow.AddMinutes(5),
OrchestrationRuntimeState =
DeserializeOrchestrationRuntimeState(taskSession.SessionState) ??
new OrchestrationRuntimeState(),
};
return wi;
}
/// <inheritdoc />
public Task CompleteTaskOrchestrationWorkItemAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
IList<TaskMessage> outboundMessages,
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> workItemTimerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState state)
{
lock (this.thisLock)
{
byte[] newSessionState;
if (newOrchestrationRuntimeState == null ||
newOrchestrationRuntimeState.ExecutionStartedEvent == null ||
newOrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running)
{
newSessionState = null;
}
else
{
newSessionState = SerializeOrchestrationRuntimeState(newOrchestrationRuntimeState);
}
this.orchestratorQueue.CompleteSession(
workItem.InstanceId,
newSessionState,
orchestratorMessages,
continuedAsNewMessage
);
if (outboundMessages != null)
{
foreach (TaskMessage m in outboundMessages)
{
// TODO : make async (AFFANDAR)
this.workerQueue.SendMessageAsync(m);
}
}
if (workItemTimerMessages != null)
{
lock (this.timerLock)
{
foreach (TaskMessage m in workItemTimerMessages)
{
this.timerMessages.Add(m);
}
}
}
if (workItem.OrchestrationRuntimeState != newOrchestrationRuntimeState)
{
var oldState = Utils.BuildOrchestrationState(workItem.OrchestrationRuntimeState);
CommitState(workItem.OrchestrationRuntimeState, oldState).GetAwaiter().GetResult();
}
if (state != null)
{
CommitState(newOrchestrationRuntimeState, state).GetAwaiter().GetResult();
}
}
return Task.FromResult(0);
}
Task CommitState(OrchestrationRuntimeState runtimeState, OrchestrationState state)
{
if (!this.instanceStore.TryGetValue(runtimeState.OrchestrationInstance.InstanceId, out Dictionary<string, OrchestrationState> mapState))
{
mapState = new Dictionary<string, OrchestrationState>();
this.instanceStore[runtimeState.OrchestrationInstance.InstanceId] = mapState;
}
mapState[runtimeState.OrchestrationInstance.ExecutionId] = state;
// signal any waiters waiting on instanceid_executionid or just the latest instanceid_
if (state.OrchestrationStatus == OrchestrationStatus.Running
|| state.OrchestrationStatus == OrchestrationStatus.Pending)
{
return Task.FromResult(0);
}
string key = runtimeState.OrchestrationInstance.InstanceId + "_" +
runtimeState.OrchestrationInstance.ExecutionId;
string key1 = runtimeState.OrchestrationInstance.InstanceId + "_";
var tasks = new List<Task>();
if (this.orchestrationWaiters.TryGetValue(key, out TaskCompletionSource<OrchestrationState> tcs))
{
tasks.Add(Task.Run(() => tcs.TrySetResult(state)));
}
// for instance id level waiters, we will not consider ContinueAsNew as a terminal state because
// the high level orchestration is still ongoing
if (state.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew
&& this.orchestrationWaiters.TryGetValue(key1, out TaskCompletionSource<OrchestrationState> tcs1))
{
tasks.Add(Task.Run(() => tcs1.TrySetResult(state)));
}
if (tasks.Count > 0)
{
Task.WaitAll(tasks.ToArray());
}
return Task.FromResult(0);
}
/// <inheritdoc />
public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
this.orchestratorQueue.AbandonSession(workItem.InstanceId);
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public int TaskActivityDispatcherCount => 1;
/// <summary>
/// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew
/// </summary>
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => BehaviorOnContinueAsNew.Carryover;
/// <inheritdoc />
public int MaxConcurrentTaskActivityWorkItems => this.MaxConcurrentWorkItems;
/// <inheritdoc />
public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string message)
{
var taskMessage = new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
Event = new ExecutionTerminatedEvent(-1, message)
};
await SendTaskOrchestrationMessageAsync(taskMessage);
}
/// <inheritdoc />
public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
{
workItem.LockedUntilUtc = workItem.LockedUntilUtc.AddMinutes(5);
return Task.FromResult(0);
}
/// <inheritdoc />
public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
{
return false;
}
/// <inheritdoc />
public int GetDelayInSecondsAfterOnProcessException(Exception exception)
{
return 0;
}
/// <inheritdoc />
public int GetDelayInSecondsAfterOnFetchException(Exception exception)
{
return 0;
}
/// <inheritdoc />
public int TaskOrchestrationDispatcherCount => 1;
/******************************/
// Task activity methods
/******************************/
/// <inheritdoc />
public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
TaskMessage taskMessage = await this.workerQueue.ReceiveMessageAsync(receiveTimeout,
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.cancellationTokenSource.Token).Token);
if (taskMessage == null)
{
return null;
}
return new TaskActivityWorkItem
{
// for the in memory provider we will just use the TaskMessage object ref itself as the id
Id = "N/A",
LockedUntilUtc = DateTime.UtcNow.AddMinutes(5),
TaskMessage = taskMessage,
};
}
/// <inheritdoc />
public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
{
this.workerQueue.AbandonMessageAsync(workItem.TaskMessage);
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
{
lock (this.thisLock)
{
this.workerQueue.CompleteMessageAsync(workItem.TaskMessage);
this.orchestratorQueue.SendMessage(responseMessage);
}
return Task.FromResult<object>(null);
}
/// <inheritdoc />
public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
{
// TODO : add expiration if we want to unit test it (AFFANDAR)
workItem.LockedUntilUtc = workItem.LockedUntilUtc.AddMinutes(5);
return Task.FromResult(workItem);
}
byte[] SerializeOrchestrationRuntimeState(OrchestrationRuntimeState runtimeState)
{
if (runtimeState == null)
{
return null;
}
string serializeState = JsonConvert.SerializeObject(runtimeState.Events, StateJsonSettings);
return Encoding.UTF8.GetBytes(serializeState);
}
OrchestrationRuntimeState DeserializeOrchestrationRuntimeState(byte[] stateBytes)
{
if (stateBytes == null || stateBytes.Length == 0)
{
return null;
}
string serializedState = Encoding.UTF8.GetString(stateBytes);
var events = JsonConvert.DeserializeObject<IList<HistoryEvent>>(serializedState, StateJsonSettings);
return new OrchestrationRuntimeState(events);
}
/// <inheritdoc />
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
void Dispose(bool disposing)
{
if (disposing)
{
this.cancellationTokenSource.Cancel();
this.cancellationTokenSource.Dispose();
}
}
/// <inheritdoc />
/// Test only for core entities. The value is set as default.
EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties()
{
EntityMessageReorderWindow = TimeSpan.FromMinutes(30),
MaxEntityOperationBatchSize = null,
MaxConcurrentTaskEntityWorkItems = 100,
SupportsImplicitEntityDeletion = false, // not supported by this backend
MaximumSignalDelayTime = TimeSpan.FromDays(6),
UseSeparateQueueForEntityWorkItems = false,
};
/// <inheritdoc />
EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => null;
/// <inheritdoc />
Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextEntityWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
return this.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
/// <inheritdoc />
Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
return this.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
}
}