src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContextFactory.cs (247 lines of code) (raw):

// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host.Config; using Microsoft.Azure.WebJobs.Host.Dispatch; using Microsoft.Azure.WebJobs.Host.Indexers; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Loggers; using Microsoft.Azure.WebJobs.Host.Properties; using Microsoft.Azure.WebJobs.Host.Protocols; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.Timers; using Microsoft.Azure.WebJobs.Host.Triggers; using Microsoft.Azure.WebJobs.Logging; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Microsoft.Azure.WebJobs.Host.Executors { internal class JobHostContextFactory : IJobHostContextFactory { private readonly IFunctionExecutor _functionExecutor; private readonly IFunctionIndexProvider _functionIndexProvider; private readonly ITriggerBindingProvider _triggerBindingProvider; private readonly SingletonManager _singletonManager; private readonly IJobActivator _activator; private readonly IHostIdProvider _hostIdProvider; private readonly INameResolver _nameResolver; private readonly IExtensionRegistry _extensions; private readonly ILoggerFactory _loggerFactory; private readonly IWebJobsExceptionHandler _exceptionHandler; private readonly SharedQueueHandler _sharedQueueHandler; private readonly IOptions<JobHostOptions> _jobHostOptions; private readonly IHostInstanceLogger _hostInstanceLogger; private readonly IFunctionInstanceLogger _functionInstanceLogger; private readonly IFunctionOutputLogger _functionOutputLogger; private readonly IConverterManager _converterManager; private readonly IAsyncCollector<FunctionInstanceLogEntry> _eventCollector; private readonly IDashboardLoggingSetup _dashboardLoggingSetup; private readonly IScaleMonitorManager _monitorManager; private readonly IDrainModeManager _drainModeManager; private readonly IApplicationLifetime _applicationLifetime; private readonly ITargetScalerManager _targetScalerManager; private readonly IEnumerable<IListenerDecorator> _listenerDecorators; public JobHostContextFactory( IDashboardLoggingSetup dashboardLoggingSetup, IFunctionExecutor functionExecutor, IFunctionIndexProvider functionIndexProvider, ITriggerBindingProvider triggerBindingProvider, SingletonManager singletonManager, IJobActivator activator, IHostIdProvider hostIdProvider, INameResolver nameResolver, IExtensionRegistry extensions, ILoggerFactory loggerFactory, IWebJobsExceptionHandler exceptionHandler, SharedQueueHandler sharedQueueHandler, IOptions<JobHostOptions> jobHostOptions, IHostInstanceLogger hostInstanceLogger, IFunctionInstanceLogger functionInstanceLogger, IFunctionOutputLogger functionOutputLogger, IConverterManager converterManager, IAsyncCollector<FunctionInstanceLogEntry> eventCollector, IScaleMonitorManager monitorManager, IDrainModeManager drainModeManager, IApplicationLifetime applicationLifetime, ITargetScalerManager targetScalerManager, IEnumerable<IListenerDecorator> listenerDecorators) { _dashboardLoggingSetup = dashboardLoggingSetup; _functionExecutor = functionExecutor; _functionIndexProvider = functionIndexProvider; _triggerBindingProvider = triggerBindingProvider; _singletonManager = singletonManager; _activator = activator; _hostIdProvider = hostIdProvider; _nameResolver = nameResolver; _extensions = extensions; _loggerFactory = loggerFactory; _exceptionHandler = exceptionHandler; _sharedQueueHandler = sharedQueueHandler; _jobHostOptions = jobHostOptions; _hostInstanceLogger = hostInstanceLogger; _functionInstanceLogger = functionInstanceLogger; _functionOutputLogger = functionOutputLogger; _converterManager = converterManager; _eventCollector = eventCollector; _monitorManager = monitorManager; _drainModeManager = drainModeManager; _applicationLifetime = applicationLifetime; _targetScalerManager = targetScalerManager; _listenerDecorators = listenerDecorators; } public async Task<JobHostContext> Create(JobHost host, CancellationToken shutdownToken, CancellationToken cancellationToken) { shutdownToken.Register(() => { // when a shutdown is triggered we want to stop the application, to ensure the host // shuts down gracefully _applicationLifetime.StopApplication(); }); using (CancellationTokenSource combinedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, shutdownToken)) { CancellationToken combinedCancellationToken = combinedCancellationSource.Token; await WriteSiteExtensionManifestAsync(combinedCancellationToken); // TODO: FACAVAL: Chat with Brettsam, this should probably be moved out of here. _loggerFactory.AddProvider(new FunctionOutputLoggerProvider()); IFunctionIndex functions = await _functionIndexProvider.GetAsync(combinedCancellationToken); Action listenersCreatedCallback = () => { // only trigger HostInitialized after all listeners are created (but before // they are started). host.OnHostInitialized(); }; IListenerFactory functionsListenerFactory = new HostListenerFactory(functions.ReadAll(), _loggerFactory, _monitorManager, _targetScalerManager, _listenerDecorators, listenersCreatedCallback, _drainModeManager); string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken); bool dashboardLoggingEnabled = _dashboardLoggingSetup.Setup(functions, functionsListenerFactory, out IFunctionExecutor hostCallExecutor, out IListener listener, out HostOutputMessage hostOutputMessage, hostId, shutdownToken); if (dashboardLoggingEnabled) { // Publish this to Azure logging account so that a web dashboard can see it. await LogHostStartedAsync(functions, hostOutputMessage, _hostInstanceLogger, combinedCancellationToken); } if (_functionExecutor is FunctionExecutor executor) { executor.HostOutputMessage = hostOutputMessage; } IEnumerable<FunctionDescriptor> descriptors = functions.ReadAllDescriptors(); int descriptorsCount = descriptors.Count(); ILogger startupLogger = _loggerFactory?.CreateLogger(LogCategories.Startup); if (_jobHostOptions.Value.UsingDevelopmentSettings) { startupLogger?.LogDebug("Development settings applied"); } if (descriptorsCount == 0) { startupLogger?.LogWarning($"No job functions found. Try making your job classes and methods public. {Resource.ExtensionInitializationMessage}"); } else { StringBuilder functionsTrace = new StringBuilder(); functionsTrace.AppendLine("Found the following functions:"); foreach (FunctionDescriptor descriptor in descriptors) { functionsTrace.AppendLine(descriptor.FullName); } string msg = functionsTrace.ToString(); startupLogger?.LogInformation(msg); } return new JobHostContext( functions, hostCallExecutor, listener, _eventCollector, _loggerFactory); } } [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] internal static IListener CreateHostListener(IListenerFactory allFunctionsListenerFactory, SharedQueueHandler sharedQueue, IRecurrentCommand heartbeatCommand, IWebJobsExceptionHandler exceptionHandler, CancellationToken shutdownToken) { IListener factoryListener = new ListenerFactoryListener(allFunctionsListenerFactory, sharedQueue); IListener heartbeatListener = new HeartbeatListener(heartbeatCommand, exceptionHandler, factoryListener); IListener shutdownListener = new ShutdownListener(shutdownToken, heartbeatListener); return shutdownListener; } private static Task LogHostStartedAsync(IFunctionIndex functionIndex, HostOutputMessage hostOutputMessage, IHostInstanceLogger logger, CancellationToken cancellationToken) { IEnumerable<FunctionDescriptor> functions = functionIndex.ReadAllDescriptors(); HostStartedMessage message = new HostStartedMessage { HostInstanceId = hostOutputMessage.HostInstanceId, HostDisplayName = hostOutputMessage.HostDisplayName, SharedQueueName = hostOutputMessage.SharedQueueName, InstanceQueueName = hostOutputMessage.InstanceQueueName, Heartbeat = hostOutputMessage.Heartbeat, WebJobRunIdentifier = hostOutputMessage.WebJobRunIdentifier, Functions = functions }; return logger.LogHostStartedAsync(message, cancellationToken); } internal static Assembly GetHostAssembly(IEnumerable<MethodInfo> methods) { // 1. Try to get the assembly name from the first method. MethodInfo firstMethod = methods.FirstOrDefault(); if (firstMethod != null) { return firstMethod.DeclaringType.Assembly; } // 2. If there are no function definitions, try to use the entry assembly. Assembly entryAssembly = Assembly.GetEntryAssembly(); if (entryAssembly != null) { return entryAssembly; } // 3. If there's no entry assembly either, we don't have anything to use. return null; } private static async Task WriteSiteExtensionManifestAsync(CancellationToken cancellationToken) { string jobDataPath = Environment.GetEnvironmentVariable(WebSitesKnownKeyNames.JobDataPath); if (jobDataPath == null) { // we're not in Azure Web Sites, bye bye. return; } const string Filename = "WebJobsSdk.marker"; var path = Path.Combine(jobDataPath, Filename); const int DefaultBufferSize = 4096; try { using (Stream stream = new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, DefaultBufferSize, useAsync: true)) using (TextWriter writer = new StreamWriter(stream)) { // content is not really important, this would help debugging though cancellationToken.ThrowIfCancellationRequested(); await writer.WriteAsync(DateTime.UtcNow.ToString("s") + "Z"); await writer.FlushAsync(); } } catch (Exception ex) { if (ex is UnauthorizedAccessException || ex is IOException) { // simultaneous access error or an error caused by some other issue // ignore it and skip marker creation } else { throw; } } } internal static IFunctionExecutor CreateHostCallExecutor(IListenerFactory instanceQueueListenerFactory, IRecurrentCommand heartbeatCommand, IWebJobsExceptionHandler exceptionHandler, CancellationToken shutdownToken, IFunctionExecutor innerExecutor) { IFunctionExecutor heartbeatExecutor = new HeartbeatFunctionExecutor(heartbeatCommand, exceptionHandler, innerExecutor); IFunctionExecutor abortListenerExecutor = new AbortListenerFunctionExecutor(instanceQueueListenerFactory, heartbeatExecutor); IFunctionExecutor shutdownFunctionExecutor = new ShutdownFunctionExecutor(shutdownToken, abortListenerExecutor); return shutdownFunctionExecutor; } internal class DataOnlyHostOutputMessage : HostOutputMessage { internal override void AddMetadata(IDictionary<string, string> metadata) { throw new NotSupportedException(); } } } }