in src/DotNetWorker.Grpc/Handlers/InvocationHandler.cs [54:142]
public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
{
using CancellationTokenSource cancellationTokenSource = new();
FunctionContext? context = null;
InvocationResponse response = new()
{
InvocationId = request.InvocationId,
Result = new StatusResult()
};
if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
{
var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
response.Result.Status = StatusResult.Types.Status.Failure;
response.Result.Exception = exception.ToRpcException();
return response;
}
try
{
var invocation = new GrpcFunctionInvocation(request);
IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
invocationFeatures.Set<FunctionInvocation>(invocation);
invocationFeatures.Set<IExecutionRetryFeature>(invocation);
context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));
if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
{
invocationFeatures.Set<IInputConversionFeature>(conversion!);
}
await _application.InvokeFunctionAsync(context);
var serializer = _workerOptions.Serializer!;
var functionBindings = context.GetBindings();
foreach (var binding in functionBindings.OutputBindingData)
{
var parameterBinding = new ParameterBinding
{
Name = binding.Key
};
if (binding.Value is not null)
{
parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
}
response.OutputData.Add(parameterBinding);
}
if (functionBindings.InvocationResult is not null)
{
TypedData returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
response.ReturnValue = returnVal;
}
response.Result.Status = StatusResult.Types.Status.Success;
}
catch (Exception ex) when (!ex.IsFatal())
{
#pragma warning disable CS0618 // Type or member is obsolete
response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
#pragma warning restore CS0618 // Type or member is obsolete
response.Result.Status = StatusResult.Types.Status.Failure;
if (ex.InnerException is TaskCanceledException or OperationCanceledException)
{
response.Result.Status = StatusResult.Types.Status.Cancelled;
}
}
finally
{
_inflightInvocations.TryRemove(request.InvocationId, out var cts);
if (context is IAsyncDisposable asyncContext)
{
await asyncContext.DisposeAsync();
}
(context as IDisposable)?.Dispose();
}
return response;
}