src/Microsoft.Azure.WebJobs.Host/Executors/HostMessageExecutor.cs (116 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Indexers;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Loggers;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Microsoft.Azure.WebJobs.Host.Executors
{
internal class HostMessageExecutor
{
private readonly IFunctionExecutor _innerExecutor;
private readonly IFunctionIndexLookup _functionLookup;
private readonly IFunctionInstanceLogger _functionInstanceLogger;
private readonly ILoggerFactory _loggerFactory;
public HostMessageExecutor(IFunctionExecutor innerExecutor, IFunctionIndexLookup functionLookup, IFunctionInstanceLogger functionInstanceLogger, ILoggerFactory loggerFactory)
{
_innerExecutor = innerExecutor;
_functionLookup = functionLookup;
_functionInstanceLogger = functionInstanceLogger;
_loggerFactory = loggerFactory;
}
public async Task<FunctionResult> ExecuteAsync(string value, CancellationToken cancellationToken)
{
HostMessage model = JsonConvert.DeserializeObject<HostMessage>(value, JsonSerialization.Settings);
if (model == null)
{
throw new InvalidOperationException("Invalid invocation message.");
}
CallAndOverrideMessage callAndOverrideModel = model as CallAndOverrideMessage;
if (callAndOverrideModel != null)
{
await ProcessCallAndOverrideMessage(callAndOverrideModel, cancellationToken);
return new FunctionResult(true);
}
AbortHostInstanceMessage abortModel = model as AbortHostInstanceMessage;
if (abortModel != null)
{
ProcessAbortHostInstanceMessage();
return new FunctionResult(true);
}
string error = String.Format(CultureInfo.InvariantCulture, "Unsupported invocation type '{0}'.", model.Type);
throw new NotSupportedException(error);
}
// This snapshot won't contain full normal data for Function.FullName, Function.ShortName and Function.Parameters.
// (All we know is an unavailable function ID; which function location method info to use is a mystery.)
private static FunctionCompletedMessage CreateFailedMessage(CallAndOverrideMessage message)
{
DateTimeOffset startAndEndTime = DateTimeOffset.UtcNow;
Exception exception = new InvalidOperationException(string.Format(CultureInfo.CurrentCulture,
"No function '{0}' currently exists.", message.FunctionId));
// In theory, we could also set HostId, HostInstanceId and WebJobRunId; we'd just have to expose that data
// directly to this Worker class.
return new FunctionCompletedMessage
{
FunctionInstanceId = message.Id,
Function = new FunctionDescriptor
{
Id = message.FunctionId
},
Arguments = message.Arguments,
ParentId = message.ParentId,
Reason = message.Reason,
StartTime = startAndEndTime,
EndTime = startAndEndTime,
Failure = new FunctionFailure
{
Exception = exception,
ExceptionType = exception.GetType().FullName,
ExceptionDetails = exception.Message
}
};
}
private IFunctionInstance CreateFunctionInstance(CallAndOverrideMessage message, IFunctionDefinition function)
{
IDictionary<string, object> objectParameters = new Dictionary<string, object>();
if (message.Arguments != null)
{
foreach (KeyValuePair<string, string> item in message.Arguments)
{
objectParameters.Add(item.Key, item.Value);
}
}
var context = new FunctionInstanceFactoryContext
{
Id = message.Id,
ParentId = message.ParentId,
ExecutionReason = message.Reason,
Parameters = objectParameters
};
return function.InstanceFactory.Create(context);
}
private async Task ProcessCallAndOverrideMessage(CallAndOverrideMessage message, CancellationToken cancellationToken)
{
IFunctionDefinition function = _functionLookup.Lookup(message.FunctionId);
if (function != null)
{
Func<IFunctionInstance> instanceFactory = () => CreateFunctionInstance(message, function);
await _innerExecutor.TryExecuteAsync(instanceFactory, _loggerFactory, cancellationToken);
}
else
{
// Log that the function failed.
FunctionCompletedMessage failedMessage = CreateFailedMessage(message);
_functionInstanceLogger.LogFunctionCompleted(failedMessage);
}
}
private static void ProcessAbortHostInstanceMessage()
{
bool terminated = NativeMethods.TerminateProcess(NativeMethods.GetCurrentProcess(), 1);
Debug.Assert(terminated);
}
}
}