in src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs [850:918]
internal async Task SendInvocationRequest(ScriptInvocationContext context)
{
try
{
string invocationId = context.ExecutionContext.InvocationId.ToString();
string functionId = context.FunctionMetadata.GetFunctionId();
// do not send an invocation request for functions that failed to load or could not be indexed by the worker
if (_functionLoadErrors.TryGetValue(functionId, out Exception exception))
{
_workerChannelLogger.LogDebug("Function {functionName} failed to load", context.FunctionMetadata.Name);
context.ResultSource.TrySetException(exception);
RemoveExecutingInvocation(invocationId);
return;
}
else if (_metadataRequestErrors.TryGetValue(functionId, out exception))
{
_workerChannelLogger.LogDebug("Worker failed to load metadata for {functionName}", context.FunctionMetadata.Name);
context.ResultSource.TrySetException(exception);
RemoveExecutingInvocation(invocationId);
return;
}
if (context.CancellationToken.IsCancellationRequested)
{
_workerChannelLogger.LogDebug("Cancellation was requested prior to the invocation request ('{invocationId}') being sent to the worker.", invocationId);
// If the worker does not support handling InvocationCancel grpc messages, or if cancellation is supported and the customer opts-out
// of sending cancelled invocations to the worker, we will cancel the result source and not send the invocation to the worker.
if (!_isHandlesInvocationCancelMessageCapabilityEnabled || !JobHostOptions.Value.SendCanceledInvocationsToWorker)
{
_workerChannelLogger.LogInformation("Cancelling invocation '{invocationId}' due to cancellation token being signaled. "
+ "This invocation was not sent to the worker. Read more about this here: https://aka.ms/azure-functions-cancellations", invocationId);
// This will result in an invocation failure with a "FunctionInvocationCanceled" exception.
context.ResultSource.TrySetCanceled();
return;
}
}
var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager);
AddAdditionalTraceContext(invocationRequest, context);
_executingInvocations.TryAdd(invocationRequest.InvocationId, new(context, _messageDispatcherFactory.Create(invocationRequest.InvocationId)));
_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: Sanitizer.Sanitize(context.FunctionMetadata.Name));
// If the worker supports HTTP proxying, ensure this request is forwarded prior
// to sending the invocation request to the worker, as this will ensure the context
// is properly set up.
if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction())
{
_httpProxyService.StartForwarding(context, _httpProxyEndpoint);
}
await SendStreamingMessageAsync(new StreamingMessage
{
InvocationRequest = invocationRequest
});
if (_isHandlesInvocationCancelMessageCapabilityEnabled)
{
var cancellationCtr = context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId));
context.Properties.Add(ScriptConstants.CancellationTokenRegistration, cancellationCtr);
}
}
catch (Exception invokeEx)
{
context.ResultSource.TrySetException(invokeEx);
}
}