in src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs [255:367]
public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var placeholderModeEnabled = _environment.IsPlaceholderModeEnabled();
_logger.LogDebug($"Placeholder mode is enabled: {placeholderModeEnabled}");
if (placeholderModeEnabled)
{
return;
}
_workerRuntime ??= Utility.GetWorkerRuntime(functions, _environment);
// In case of multi language runtime, _workerRuntime has no significance, thus skipping this check for multi language runtime environment
if ((string.IsNullOrEmpty(_workerRuntime) || _workerRuntime.Equals(RpcWorkerConstants.DotNetLanguageWorkerName, StringComparison.InvariantCultureIgnoreCase)) && !_environment.IsMultiLanguageRuntimeEnvironment())
{
// Shutdown any placeholder channels for empty function apps or dotnet function apps.
// This is needed as specilization does not kill standby placeholder channels if worker runtime is not set.
// Debouce to ensure this does not effect cold start
_shutdownStandbyWorkerChannels();
return;
}
var workerConfig = _workerConfigs.FirstOrDefault(c => c.Description.Language.Equals(_workerRuntime, StringComparison.InvariantCultureIgnoreCase));
// For other OOP workers, workerconfigs are present inside "workers" folder of host bin directory and is used to populate "_workerConfigs".
// For dotnet-isolated _workerConfigs is populated by reading workerconfig.json from the deployed payload(customer function app code).
// So if workerConfig is null and worker runtime is dotnet-isolated, that means isolated function code was not deployed yet.
var isDotNetIsolatedAppWithoutPayload = string.Equals(_workerRuntime, RpcWorkerConstants.DotNetIsolatedLanguageWorkerName, StringComparison.InvariantCultureIgnoreCase)
&& workerConfig == null;
// We are skipping this check for multi-language environments because they use multiple workers and thus doesn't honor 'FUNCTIONS_WORKER_RUNTIME'
// Also, skip if dotnet-isolated app without payload as it is a valid case to exist.
if (workerConfig == null && (functions == null || !functions.Any())
&& !_environment.IsMultiLanguageRuntimeEnvironment()
&& !isDotNetIsolatedAppWithoutPayload)
{
// Only throw if workerConfig is null AND some functions have been found.
// With .NET out-of-proc, worker config comes from functions.
var allLanguageNamesFromWorkerConfigs = string.Join(",", _workerConfigs.Select(c => c.Description.Language));
_logger.LogDebug($"Languages present in WorkerConfig: {allLanguageNamesFromWorkerConfigs}");
throw new InvalidOperationException($"WorkerConfig for runtime: {_workerRuntime} not found");
}
if (functions == null || !functions.Any())
{
// do not initialize function dispatcher if there are no functions, unless the worker is indexing
_logger.LogDebug($"{nameof(RpcFunctionInvocationDispatcher)} received no functions");
return;
}
_functions = functions;
if (_environment.IsMultiLanguageRuntimeEnvironment())
{
_processStartupInterval = _workerConfigs.Max(wc => wc.CountOptions.ProcessStartupInterval);
_restartWait = _workerConfigs.Max(wc => wc.CountOptions.ProcessRestartInterval);
_shutdownTimeout = _workerConfigs.Max(wc => wc.CountOptions.ProcessShutdownTimeout);
}
else
{
_processStartupInterval = workerConfig?.CountOptions?.ProcessStartupInterval ?? _defaultProcessStartupInterval;
_restartWait = workerConfig?.CountOptions.ProcessRestartInterval ?? _defaultProcessRestartInterval;
_shutdownTimeout = workerConfig?.CountOptions.ProcessShutdownTimeout ?? _defaultProcessShutdownInterval;
}
ErrorEventsThreshold = 3 * await _maxProcessCount.Value;
if (Utility.IsSupportedRuntime(_workerRuntime, _workerConfigs) || _environment.IsMultiLanguageRuntimeEnvironment())
{
State = FunctionInvocationDispatcherState.Initializing;
IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
if (webhostLanguageWorkerChannels != null)
{
int workerProcessCount = 0;
foreach (string workerId in webhostLanguageWorkerChannels.Keys.ToList())
{
if (webhostLanguageWorkerChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> initializedLanguageWorkerChannelTask))
{
_logger.LogDebug("Found initialized language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
try
{
IRpcWorkerChannel initializedLanguageWorkerChannel = await initializedLanguageWorkerChannelTask.Task;
initializedLanguageWorkerChannel.SetupFunctionInvocationBuffers(_functions);
initializedLanguageWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value, _scriptOptions.FunctionTimeout);
++workerProcessCount;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId, ex);
}
}
}
StartWorkerProcesses(workerProcessCount, InitializeWebhostLanguageWorkerChannel, true, _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime));
}
else
{
if (_environment.IsMultiLanguageRuntimeEnvironment())
{
var workerLanguagesToStart = functions.Select(function => function.Language).Distinct();
StartWorkerProcesses(startIndex: 0, startAction: InitializeJobhostLanguageWorkerChannelAsync, functionLanguages: workerLanguagesToStart);
}
else
{
StartWorkerProcesses(0, InitializeJobhostLanguageWorkerChannelAsync);
}
}
}
AddLogUserCategory(functions);
}