in src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs [247:308]
public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId, Exception workerException = null)
{
if (string.IsNullOrEmpty(language))
{
throw new ArgumentNullException(nameof(language));
}
if (_hostingConfigOptions.Value.RevertWorkerShutdownBehavior)
{
if (_workerChannels.TryRemove(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels))
{
if (rpcWorkerChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value?.Task.ContinueWith(channelTask =>
{
if (channelTask.Status == TaskStatus.Faulted)
{
_logger.LogDebug(channelTask.Exception, "Removing errored worker channel");
}
else
{
IRpcWorkerChannel workerChannel = channelTask.Result;
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
});
return Task.FromResult(true);
}
}
}
else
{
if (_workerChannels.TryGetValue(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels)
&& rpcWorkerChannels.TryRemove(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value?.Task.ContinueWith(channelTask =>
{
if (channelTask.Status == TaskStatus.Faulted)
{
_logger.LogDebug(channelTask.Exception, "Removing errored worker channel");
}
else
{
IRpcWorkerChannel workerChannel = channelTask.Result;
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
});
return Task.FromResult(true);
}
}
return Task.FromResult(false);
}