in src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs [537:838]
internal async Task<TResult> CallDurableTaskFunctionAsync<TResult>(
string functionName,
FunctionType functionType,
bool oneWay,
string instanceId,
string operation,
RetryOptions retryOptions,
object input,
DateTime? scheduledTimeUtc)
{
this.ThrowIfInvalidAccess();
if (retryOptions != null)
{
if (!this.durabilityProvider.ValidateDelayTime(retryOptions.MaxRetryInterval, out string errorMessage))
{
throw new ArgumentException(errorMessage, nameof(retryOptions.MaxRetryInterval));
}
if (!this.durabilityProvider.ValidateDelayTime(retryOptions.FirstRetryInterval, out errorMessage))
{
throw new ArgumentException(errorMessage, nameof(retryOptions.FirstRetryInterval));
}
}
// Propagate the default version to orchestrators.
// TODO: Decide whether we want to propagate the default version to actitities and entities as well.
string version = (functionType == FunctionType.Orchestrator)
? this.Config.Options.DefaultVersion
: string.Empty;
this.Config.ThrowIfFunctionDoesNotExist(functionName, functionType);
Task<TResult> callTask = null;
EntityId? lockToUse = null;
string operationId = string.Empty;
string operationName = string.Empty;
switch (functionType)
{
case FunctionType.Activity:
System.Diagnostics.Debug.Assert(instanceId == null, "The instanceId parameter should not be used for activity functions.");
System.Diagnostics.Debug.Assert(operation == null, "The operation parameter should not be used for activity functions.");
System.Diagnostics.Debug.Assert(!oneWay, "The oneWay parameter should not be used for activity functions.");
if (retryOptions == null)
{
this.IncrementActionsOrThrowException();
callTask = this.InnerContext.ScheduleTask<TResult>(functionName, version, input);
}
else
{
this.IncrementActionsOrThrowException();
callTask = this.InnerContext.ScheduleWithRetry<TResult>(
functionName,
version,
retryOptions.GetRetryOptions(),
input);
}
break;
case FunctionType.Orchestrator:
// Instance IDs should not be reused when creating sub-orchestrations. This is a best-effort
// check. We cannot easily check the full hierarchy, so we just look at the current orchestration
// and the immediate parent.
if (string.Equals(instanceId, this.InstanceId, StringComparison.OrdinalIgnoreCase) ||
(this.ParentInstanceId != null && string.Equals(instanceId, this.ParentInstanceId, StringComparison.OrdinalIgnoreCase)))
{
throw new ArgumentException("The instance ID of a sub-orchestration must be different than the instance ID of a parent orchestration.");
}
System.Diagnostics.Debug.Assert(operation == null, "The operation parameter should not be used for activity functions.");
if (instanceId != null && instanceId.StartsWith("@"))
{
throw new ArgumentException(nameof(instanceId), "Orchestration instance ids must not start with @");
}
if (oneWay)
{
this.IncrementActionsOrThrowException();
var dummyTask = this.InnerContext.CreateSubOrchestrationInstance<TResult>(
functionName,
version,
instanceId,
input,
new Dictionary<string, string>() { { OrchestrationTags.FireAndForget, "" } });
System.Diagnostics.Debug.Assert(dummyTask.IsCompleted, "task should be fire-and-forget");
}
else
{
if (this.ContextLocks != null)
{
throw new LockingRulesViolationException("While holding locks, cannot call suborchestrators.");
}
if (retryOptions == null)
{
this.IncrementActionsOrThrowException();
callTask = this.InnerContext.CreateSubOrchestrationInstance<TResult>(
functionName,
version,
instanceId,
input);
}
else
{
this.IncrementActionsOrThrowException();
callTask = this.InnerContext.CreateSubOrchestrationInstanceWithRetry<TResult>(
functionName,
version,
instanceId,
retryOptions.GetRetryOptions(),
input);
}
}
break;
case FunctionType.Entity:
System.Diagnostics.Debug.Assert(operation != null, "The operation parameter is required.");
System.Diagnostics.Debug.Assert(retryOptions == null, "Retries are not supported for entity calls.");
System.Diagnostics.Debug.Assert(instanceId != null, "Entity calls need to specify the target entity.");
if (this.ContextLocks != null)
{
lockToUse = EntityId.GetEntityIdFromSchedulerId(instanceId);
if (oneWay)
{
if (this.ContextLocks.Contains(lockToUse.Value))
{
throw new LockingRulesViolationException("While holding locks, cannot signal entities whose lock is held.");
}
}
else
{
if (!this.ContextLocks.Remove(lockToUse.Value))
{
throw new LockingRulesViolationException("While holding locks, cannot call entities whose lock is not held.");
}
}
}
var guid = this.NewGuid(); // deterministically replayable unique id for this request
var target = new OrchestrationInstance() { InstanceId = instanceId };
operationId = guid.ToString();
operationName = operation;
var request = new RequestMessage()
{
ParentInstanceId = this.InstanceId,
ParentExecutionId = this.ExecutionId,
Id = guid,
IsSignal = oneWay,
Operation = operation,
ScheduledTime = scheduledTimeUtc,
};
if (input != null)
{
request.SetInput(input, this.messageDataConverter);
}
this.SendEntityMessage(target, request);
if (!oneWay)
{
callTask = this.WaitForEntityResponse<TResult>(guid, lockToUse);
}
break;
default:
throw new InvalidOperationException($"Unexpected function type '{functionType}'.");
}
string sourceFunctionId = this.FunctionName;
this.Config.TraceHelper.FunctionScheduled(
this.Config.Options.HubName,
functionName,
this.InstanceId,
reason: sourceFunctionId,
functionType: functionType,
isReplay: this.IsReplaying);
TResult output;
Exception exception = null;
if (oneWay)
{
return default(TResult);
}
System.Diagnostics.Debug.Assert(callTask != null, "Two-way operations are asynchronous, so callTask must not be null.");
try
{
output = await callTask;
}
catch (TaskFailedException e)
{
// Check to see if CallHttpAsync() threw a TimeoutException
// In this case, we want to throw a TimeoutException instead of a FunctionFailedException
if (functionName.Equals(HttpOptions.HttpTaskActivityReservedName) &&
(e.InnerException is TimeoutException || e.InnerException is HttpRequestException))
{
if (e.InnerException is HttpRequestException)
{
throw new HttpRequestException(e.Message);
}
throw e.InnerException;
}
exception = e;
string message = string.Format(
"The {0} function '{1}' failed: \"{2}\". See the function execution logs for additional details.",
functionType.ToString().ToLowerInvariant(),
functionName,
e.InnerException?.Message);
throw new FunctionFailedException(message, e.InnerException);
}
catch (SubOrchestrationFailedException e)
{
exception = e;
string message = string.Format(
"The {0} function '{1}' failed: \"{2}\". See the function execution logs for additional details.",
functionType.ToString().ToLowerInvariant(),
functionName,
e.InnerException?.Message);
throw new FunctionFailedException(message, e.InnerException);
}
catch (Exception e)
{
exception = e;
throw;
}
finally
{
if (exception != null && this.IsReplaying)
{
// If this were not a replay, then the orchestrator/activity/entity function trigger would have already
// emitted a FunctionFailed trace with the full exception details.
if (functionType == FunctionType.Entity)
{
this.Config.TraceHelper.OperationFailed(
this.Config.Options.HubName,
functionName,
this.InstanceId,
operationId,
operationName,
input: "(replayed)",
exception: exception,
duration: 0,
isReplay: true);
}
else
{
this.Config.TraceHelper.FunctionFailed(
this.Config.Options.HubName,
functionName,
this.InstanceId,
exception: exception,
functionType: functionType,
isReplay: true);
}
}
}
if (this.IsReplaying)
{
// If this were not a replay, then the orchestrator/activity/entity function trigger would have already
// emitted a FunctionCompleted trace with the actual output details.
if (functionType == FunctionType.Entity)
{
this.Config.TraceHelper.OperationCompleted(
this.Config.Options.HubName,
functionName,
this.InstanceId,
operationId,
operationName,
input: "(replayed)",
output: "(replayed)",
duration: 0,
isReplay: true);
}
else
{
this.Config.TraceHelper.FunctionCompleted(
this.Config.Options.HubName,
functionName,
this.InstanceId,
output: "(replayed)",
continuedAsNew: false,
functionType: functionType,
isReplay: true);
}
}
return output;
}