in src/WebJobs.Script.Grpc/Extensions/ScriptInvocationContextExtensions.cs [24:128]
public static async Task<InvocationRequest> ToRpcInvocationRequest(this ScriptInvocationContext context, ILogger logger, GrpcCapabilities capabilities, bool isSharedMemoryDataTransferEnabled, ISharedMemoryManager sharedMemoryManager)
{
bool excludeHttpTriggerMetadata = !string.IsNullOrEmpty(capabilities.GetCapabilityState(RpcWorkerConstants.RpcHttpTriggerMetadataRemoved));
var invocationRequest = new InvocationRequest
{
FunctionId = context.FunctionMetadata.GetFunctionId(),
InvocationId = context.ExecutionContext.InvocationId.ToString(),
TraceContext = GetRpcTraceContext(context.Traceparent, context.Tracestate, context.Attributes, logger),
};
SetRetryContext(context, invocationRequest);
var rpcValueCache = new Dictionary<object, TypedData>();
Dictionary<object, RpcSharedMemory> sharedMemValueCache = null;
StringBuilder logBuilder = null;
bool usedSharedMemory = false;
if (isSharedMemoryDataTransferEnabled)
{
sharedMemValueCache = new Dictionary<object, RpcSharedMemory>();
logBuilder = new StringBuilder();
}
foreach (var input in context.Inputs)
{
RpcSharedMemory sharedMemValue = null;
ParameterBinding parameterBinding = null;
if (isSharedMemoryDataTransferEnabled)
{
// Try to transfer this data over shared memory instead of RPC
if (input.Val == null || !sharedMemValueCache.TryGetValue(input.Val, out sharedMemValue))
{
sharedMemValue = await input.Val.ToRpcSharedMemoryAsync(input.Type, logger, invocationRequest.InvocationId, sharedMemoryManager);
if (input.Val != null)
{
sharedMemValueCache.Add(input.Val, sharedMemValue);
}
}
}
if (sharedMemValue != null)
{
// Data was successfully transferred over shared memory; create a ParameterBinding accordingly
parameterBinding = new ParameterBinding
{
Name = input.Name,
RpcSharedMemory = sharedMemValue
};
usedSharedMemory = true;
logBuilder.AppendFormat("{0}:{1},", input.Name, sharedMemValue.Count);
}
else
{
if (!TryConvertObjectIfNeeded(input.Val, logger, out object val))
{
// Conversion did not take place, keep the existing value as it is
val = input.Val;
}
// Data was not transferred over shared memory (either disabled, type not supported or some error); resort to RPC
TypedData rpcValue = null;
if (val == null || !rpcValueCache.TryGetValue(val, out rpcValue))
{
rpcValue = await val.ToRpc(logger, capabilities);
if (input.Val != null)
{
rpcValueCache.Add(val, rpcValue);
}
}
parameterBinding = new ParameterBinding
{
Name = input.Name,
Data = rpcValue
};
}
invocationRequest.InputData.Add(parameterBinding);
}
foreach (var pair in context.BindingData)
{
if (ShouldSkipBindingData(pair, context, excludeHttpTriggerMetadata))
{
continue;
}
if (!rpcValueCache.TryGetValue(pair.Value, out TypedData rpcValue))
{
rpcValue = await pair.Value.ToRpc(logger, capabilities);
rpcValueCache.Add(pair.Value, rpcValue);
}
invocationRequest.TriggerMetadata.Add(pair.Key, rpcValue);
}
if (usedSharedMemory)
{
logger.LogDebug("Shared memory usage for request of invocation Id: {Id} is {SharedMemoryUsage}", invocationRequest.InvocationId, logBuilder.ToString());
}
return invocationRequest;
}