src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs (381 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.AppService.Proxy.Common.Infra;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Workers.Profiles;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Microsoft.Azure.WebJobs.Script.Workers.Rpc
{
public class WebHostRpcWorkerChannelManager : IWebHostRpcWorkerChannelManager
{
private readonly ILogger _logger = null;
private readonly TimeSpan workerInitTimeout = TimeSpan.FromSeconds(30);
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _applicationHostOptions = null;
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
private readonly IScriptEventManager _eventManager = null;
private readonly IEnvironment _environment;
private readonly ILoggerFactory _loggerFactory = null;
private readonly IRpcWorkerChannelFactory _rpcWorkerChannelFactory;
private readonly IMetricsLogger _metricsLogger;
private readonly IWorkerProfileManager _profileManager;
private string _workerRuntime;
private Action _shutdownStandbyWorkerChannels;
private IConfiguration _config;
private ConcurrentDictionary<string, ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>>> _workerChannels = new(StringComparer.OrdinalIgnoreCase);
public WebHostRpcWorkerChannelManager(IScriptEventManager eventManager,
IEnvironment environment,
ILoggerFactory loggerFactory,
IRpcWorkerChannelFactory rpcWorkerChannelFactory,
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
IMetricsLogger metricsLogger,
IConfiguration config,
IWorkerProfileManager workerProfileManager,
IOptionsMonitor<LanguageWorkerOptions> languageWorkerOptions,
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
{
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
_config = config ?? throw new ArgumentNullException(nameof(config));
_profileManager = workerProfileManager ?? throw new ArgumentNullException(nameof(workerProfileManager));
_eventManager = eventManager;
_loggerFactory = loggerFactory;
_metricsLogger = metricsLogger;
_rpcWorkerChannelFactory = rpcWorkerChannelFactory;
_logger = loggerFactory.CreateLogger<WebHostRpcWorkerChannelManager>();
_applicationHostOptions = applicationHostOptions;
_hostingConfigOptions = hostingConfigOptions;
languageWorkerOptions.OnChange(async languageWorkerOptions =>
{
IRpcWorkerChannel rpcWorkerChannel = await GetChannelAsync(_workerRuntime);
if (rpcWorkerChannel != null && !UsePlaceholderChannel(rpcWorkerChannel))
{
_logger.LogInformation("Language worker options changed, and the placeholder worker channel is invalid for other reasons (e.g., worker configuration changes). Shutting down the channel.");
await ShutdownChannelIfExistsAsync(_workerRuntime, rpcWorkerChannel.Id);
}
});
_shutdownStandbyWorkerChannels = ScheduleShutdownStandbyChannels;
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(milliseconds: 5000);
}
public Task<IRpcWorkerChannel> InitializeChannelAsync(IEnumerable<RpcWorkerConfig> workerConfigs, string runtime)
{
_logger?.LogDebug("Initializing language worker channel for runtime:{runtime}", runtime);
return InitializeLanguageWorkerChannel(workerConfigs, runtime, _applicationHostOptions.CurrentValue.ScriptPath);
}
internal async Task<IRpcWorkerChannel> InitializeLanguageWorkerChannel(IEnumerable<RpcWorkerConfig> workerConfigs, string runtime, string scriptRootPath)
{
IRpcWorkerChannel rpcWorkerChannel = null;
string workerId = Guid.NewGuid().ToString();
_logger.LogDebug("Creating language worker channel for runtime:{runtime}", runtime);
try
{
rpcWorkerChannel = _rpcWorkerChannelFactory.Create(scriptRootPath, runtime, _metricsLogger, 0, workerConfigs);
AddOrUpdateWorkerChannels(runtime, rpcWorkerChannel);
await rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(processStartTask =>
{
if (processStartTask.Status == TaskStatus.RanToCompletion)
{
_logger.LogDebug("Adding WebHost language worker channel for runtime: {language}. workerId:{id}", runtime, rpcWorkerChannel.Id);
SetInitializedWorkerChannel(runtime, rpcWorkerChannel);
}
else if (processStartTask.Status == TaskStatus.Faulted)
{
_logger.LogError("Failed to start language worker process for runtime: {language}. workerId:{id}", runtime, rpcWorkerChannel.Id);
SetExceptionOnInitializedWorkerChannel(runtime, rpcWorkerChannel, processStartTask.Exception);
}
});
}
catch (Exception ex)
{
throw new HostInitializationException($"Failed to start Language Worker Channel for language :{runtime}", ex);
}
return rpcWorkerChannel;
}
internal Task<IRpcWorkerChannel> GetChannelAsync(string language)
{
if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> workerChannels))
{
if (workerChannels.Count > 0 && workerChannels.TryGetValue(workerChannels.Keys.First(), out TaskCompletionSource<IRpcWorkerChannel> valueTask))
{
return valueTask.Task;
}
}
return Task.FromResult<IRpcWorkerChannel>(null);
}
public IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> GetChannels(string language)
{
if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> workerChannels))
{
return workerChannels;
}
return null;
}
public async Task SpecializeAsync()
{
_logger.LogInformation("Starting language worker channel specialization");
_workerRuntime = _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionWorkerRuntimeSettingName);
IRpcWorkerChannel rpcWorkerChannel = await GetChannelAsync(_workerRuntime);
if (_workerRuntime != null && rpcWorkerChannel != null)
{
bool envReloadRequestResultSuccessful = false;
if (UsePlaceholderChannel(rpcWorkerChannel))
{
_logger.LogDebug("Loading environment variables for runtime: {runtime}", _workerRuntime);
envReloadRequestResultSuccessful = await rpcWorkerChannel.SendFunctionEnvironmentReloadRequest();
}
if (envReloadRequestResultSuccessful == false)
{
_logger.LogDebug("Shutting down placeholder worker. Worker is not compatible for runtime: {runtime}", _workerRuntime);
// If we need to allow file edits, we should shutdown the webhost channel on specialization.
await ShutdownChannelIfExistsAsync(_workerRuntime, rpcWorkerChannel.Id);
}
}
_shutdownStandbyWorkerChannels();
_logger.LogDebug("Completed language worker channel specialization");
}
public async Task WorkerWarmupAsync()
{
_workerRuntime = _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionWorkerRuntimeSettingName);
if (_workerRuntime == null)
{
return;
}
IRpcWorkerChannel rpcWorkerChannel = await GetChannelAsync(_workerRuntime);
if (rpcWorkerChannel != null)
{
rpcWorkerChannel.SendWorkerWarmupRequest();
}
}
private bool UsePlaceholderChannel(IRpcWorkerChannel channel)
{
string workerRuntime = channel?.WorkerConfig?.Description?.Language;
if (string.IsNullOrEmpty(workerRuntime))
{
return false;
}
// Restart worker process if custom languageWorkers:[runtime]:arguments are passed in
var workerArguments = _config.GetSection($"{RpcWorkerConstants.LanguageWorkersSectionName}:{workerRuntime}:{WorkerConstants.WorkerDescriptionArguments}").Value;
if (!string.IsNullOrEmpty(workerArguments))
{
return false;
}
if (workerRuntime.Equals(RpcWorkerConstants.DotNetIsolatedLanguageWorkerName, StringComparison.OrdinalIgnoreCase))
{
bool placeholderEnabled = _environment.UsePlaceholderDotNetIsolated();
_logger.LogDebug("UsePlaceholderDotNetIsolated: {placeholderEnabled}", placeholderEnabled);
if (!placeholderEnabled)
{
return false;
}
// We support specialization of dotnet-isolated only on 64bit host process.
if (!_environment.Is64BitProcess)
{
_logger.LogInformation(new EventId(421, ScriptConstants.PlaceholderMissDueToBitnessEventName),
"This app is configured as 32-bit and therefore does not leverage all performance optimizations. See https://aka.ms/azure-functions/dotnet/placeholders for more information.");
return false;
}
// Do not specialize if the placeholder is 6.0 but the site is 7.0 (for example).
var currentWorkerRuntimeVersion = _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionWorkerRuntimeVersionSettingName);
channel.WorkerProcess.Process.StartInfo.Environment.TryGetValue(RpcWorkerConstants.FunctionWorkerRuntimeVersionSettingName, out string placeholderWorkerRuntimeVersion);
bool versionMatches = string.Equals(currentWorkerRuntimeVersion, placeholderWorkerRuntimeVersion, StringComparison.OrdinalIgnoreCase);
_logger.LogDebug("Placeholder runtime version: '{placeholderWorkerRuntimeVersion}'. Site runtime version: '{currentWorkerRuntimeVersion}'. Match: {versionMatches}",
placeholderWorkerRuntimeVersion, currentWorkerRuntimeVersion, versionMatches);
return versionMatches;
}
// Special case: node, python and PowerShell apps must be read-only to use the placeholder mode channel
// Also cannot use placeholder worker that is targeting ~3 but has backwards compatibility with V2 enabled
// TODO: Remove special casing when resolving https://github.com/Azure/azure-functions-host/issues/4534
if (string.Equals(workerRuntime, RpcWorkerConstants.NodeLanguageWorkerName, StringComparison.OrdinalIgnoreCase)
|| string.Equals(workerRuntime, RpcWorkerConstants.PowerShellLanguageWorkerName, StringComparison.OrdinalIgnoreCase)
|| string.Equals(workerRuntime, RpcWorkerConstants.PythonLanguageWorkerName, StringComparison.OrdinalIgnoreCase))
{
// Use if readonly and not v2 compatible on ~3 extension
var isReadOnlyAndNoCompat = _applicationHostOptions.CurrentValue.IsFileSystemReadOnly && !_environment.IsV2CompatibleOnV3Extension();
if (!isReadOnlyAndNoCompat)
{
_logger.LogDebug("App will not use placeholder channel - ReadOnly: {isReadOnly}. NoCompat: {noCompat}.", _applicationHostOptions.CurrentValue.IsFileSystemReadOnly, !_environment.IsV2CompatibleOnV3Extension());
}
return isReadOnlyAndNoCompat;
}
// If a profile evaluates to true and was not previously loaded, restart worker process
if (!_profileManager.IsCorrectProfileLoaded(workerRuntime))
{
return false;
}
return true;
}
public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId, Exception workerException = null)
{
if (string.IsNullOrEmpty(language))
{
throw new ArgumentNullException(nameof(language));
}
if (_hostingConfigOptions.Value.RevertWorkerShutdownBehavior)
{
if (_workerChannels.TryRemove(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels))
{
if (rpcWorkerChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value?.Task.ContinueWith(channelTask =>
{
if (channelTask.Status == TaskStatus.Faulted)
{
_logger.LogDebug(channelTask.Exception, "Removing errored worker channel");
}
else
{
IRpcWorkerChannel workerChannel = channelTask.Result;
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
});
return Task.FromResult(true);
}
}
}
else
{
if (_workerChannels.TryGetValue(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels)
&& rpcWorkerChannels.TryRemove(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value?.Task.ContinueWith(channelTask =>
{
if (channelTask.Status == TaskStatus.Faulted)
{
_logger.LogDebug(channelTask.Exception, "Removing errored worker channel");
}
else
{
IRpcWorkerChannel workerChannel = channelTask.Result;
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
});
return Task.FromResult(true);
}
}
return Task.FromResult(false);
}
internal void ScheduleShutdownStandbyChannels()
{
using (_metricsLogger.LatencyEvent(MetricEventNames.SpecializationScheduleShutdownStandbyChannels))
{
_workerRuntime = _workerRuntime ?? _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionWorkerRuntimeSettingName);
if (!string.IsNullOrEmpty(_workerRuntime))
{
var standbyWorkerChannels = _workerChannels.Where(ch => !ch.Key.Equals(_workerRuntime, StringComparison.InvariantCultureIgnoreCase));
foreach (var runtime in standbyWorkerChannels)
{
using (_metricsLogger.LatencyEvent(string.Format(MetricEventNames.SpecializationShutdownStandbyChannels, runtime.Key)))
{
_logger.LogInformation("Disposing standby channel for runtime:{language}", runtime.Key);
if (_workerChannels.TryRemove(runtime.Key, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> standbyChannels))
{
foreach (string workerId in standbyChannels.Keys)
{
IDisposable latencyEvent = _metricsLogger.LatencyEvent(string.Format(MetricEventNames.SpecializationShutdownStandbyChannel, workerId));
standbyChannels[workerId]?.Task.ContinueWith(channelTask =>
{
if (channelTask.Status == TaskStatus.Faulted)
{
_logger.LogDebug(channelTask.Exception, "Removing errored worker channel");
}
else
{
IRpcWorkerChannel workerChannel = channelTask.Result;
if (workerChannel != null)
{
(channelTask.Result as IDisposable)?.Dispose();
}
}
latencyEvent.Dispose();
});
}
}
}
}
}
}
}
public async Task ShutdownChannelsAsync()
{
foreach (string runtime in _workerChannels.Keys)
{
_logger.LogInformation("Shutting down language worker channels for runtime:{runtime}", runtime);
if (_workerChannels.TryRemove(runtime, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> standbyChannels))
{
foreach (string workerId in standbyChannels.Keys)
{
if (standbyChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> channelTask))
{
IRpcWorkerChannel workerChannel = null;
try
{
workerChannel = await channelTask.Task;
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Removing errored worker channel");
}
if (workerChannel is IDisposable disposableWorkerChannel)
{
try
{
disposableWorkerChannel.Dispose();
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Error disposing worker channel");
}
}
}
}
}
}
}
internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel)
{
_logger.LogDebug("Adding webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
_workerChannels.AddOrUpdate(initializedRuntime,
(runtime) =>
{
ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> newLanguageWorkerChannels = new(StringComparer.OrdinalIgnoreCase);
newLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource<IRpcWorkerChannel>());
return newLanguageWorkerChannels;
},
(runtime, existingLanguageWorkerChannels) =>
{
existingLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource<IRpcWorkerChannel>());
return existingLanguageWorkerChannels;
});
}
internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel)
{
_logger.LogDebug("Initializing webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> channel))
{
if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value.SetResult(initializedLanguageWorkerChannel);
}
}
}
internal void SetExceptionOnInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel, Exception exception)
{
_logger.LogDebug("Failed to initialize webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> channel))
{
if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value.SetException(exception);
}
}
}
}
}