in src/StreamJsonRpc/JsonRpc.cs [1773:1914]
private async Task<JsonRpcMessage?> InvokeCoreAsync(JsonRpcRequest request, Type? expectedResultType, CancellationToken cancellationToken)
{
Requires.NotNull(request, nameof(request));
Assumes.NotNull(request.Method);
try
{
using (CancellationTokenExtensions.CombinedCancellationToken cts = this.DisconnectedToken.CombineWith(cancellationToken))
{
if (!request.IsResponseExpected)
{
if (JsonRpcEventSource.Instance.IsEnabled(System.Diagnostics.Tracing.EventLevel.Verbose, System.Diagnostics.Tracing.EventKeywords.None))
{
JsonRpcEventSource.Instance.SendingNotification(request.Method, JsonRpcEventSource.GetArgumentsString(request));
}
// IMPORTANT: This should be the first await in this async code path.
// This is crucial to the guarantee that overrides of SendAsync can assume they are executed
// before the first await when a JsonRpc call is made.
await this.SendAsync(request, cts.Token).ConfigureAwait(false);
return default;
}
Verify.Operation(this.readLinesTask is not null, Resources.InvalidBeforeListenHasStarted);
var tcs = new TaskCompletionSource<JsonRpcMessage>();
Action<JsonRpcMessage?> dispatcher = (response) =>
{
lock (this.dispatcherMapLock)
{
this.resultDispatcherMap.Remove(request.RequestId);
}
try
{
if (response is null)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Warning))
{
this.TraceSource.TraceEvent(TraceEventType.Warning, (int)TraceEvents.RequestAbandonedByRemote, "Aborting pending request \"{0}\" because the connection was lost.", request.RequestId);
}
if (JsonRpcEventSource.Instance.IsEnabled(System.Diagnostics.Tracing.EventLevel.Warning, System.Diagnostics.Tracing.EventKeywords.None))
{
JsonRpcEventSource.Instance.ReceivedNoResponse(request.RequestId.NumberIfPossibleForEvent);
}
if (cancellationToken.IsCancellationRequested)
{
// Consider lost connection to be result of task canceled and set state to canceled.
tcs.TrySetCanceled(cancellationToken);
}
else
{
tcs.TrySetException(new ConnectionLostException());
}
}
else if (response is JsonRpcError error)
{
if (error.Error is not null && JsonRpcEventSource.Instance.IsEnabled(System.Diagnostics.Tracing.EventLevel.Warning, System.Diagnostics.Tracing.EventKeywords.None))
{
JsonRpcEventSource.Instance.ReceivedError(request.RequestId.NumberIfPossibleForEvent, error.Error.Code);
}
if (error.Error?.Code == JsonRpcErrorCode.RequestCanceled)
{
tcs.TrySetCanceled(cancellationToken.IsCancellationRequested ? cancellationToken : CancellationToken.None);
}
else
{
tcs.SetResult(response);
}
}
else
{
if (JsonRpcEventSource.Instance.IsEnabled(System.Diagnostics.Tracing.EventLevel.Informational, System.Diagnostics.Tracing.EventKeywords.None))
{
JsonRpcEventSource.Instance.ReceivedResult(request.RequestId.NumberIfPossibleForEvent);
}
tcs.SetResult(response);
}
}
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
#pragma warning restore CA1031 // Do not catch general exception types
{
tcs.TrySetException(ex);
}
};
var callData = new OutstandingCallData(tcs, dispatcher, expectedResultType);
lock (this.dispatcherMapLock)
{
this.resultDispatcherMap.Add(request.RequestId, callData);
}
try
{
if (JsonRpcEventSource.Instance.IsEnabled(System.Diagnostics.Tracing.EventLevel.Verbose, System.Diagnostics.Tracing.EventKeywords.None))
{
JsonRpcEventSource.Instance.SendingRequest(request.RequestId.NumberIfPossibleForEvent, request.Method, JsonRpcEventSource.GetArgumentsString(request));
}
// IMPORTANT: This should be the first await in this async code path.
// This is crucial to the guarantee that overrides of SendAsync can assume they are executed
// before the first await when a JsonRpc call is made.
await this.SendAsync(request, cts.Token).ConfigureAwait(false);
}
catch
{
// Since we aren't expecting a response to this request, clear out our memory of it to avoid a memory leak.
lock (this.dispatcherMapLock)
{
this.resultDispatcherMap.Remove(request.RequestId);
}
throw;
}
// Arrange for sending a cancellation message if canceled while we're waiting for a response.
try
{
using (cancellationToken.Register(this.cancelPendingOutboundRequestAction!, request.RequestId, useSynchronizationContext: false))
{
// This task will be completed when the Response object comes back from the other end of the pipe
return await tcs.Task.ConfigureAwait(false);
}
}
finally
{
if (cancellationToken.IsCancellationRequested)
{
this.CancellationStrategy?.OutboundRequestEnded(request.RequestId);
}
}
}
}
catch (OperationCanceledException ex) when (this.DisconnectedToken.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
throw new ConnectionLostException(Resources.ConnectionDropped, ex);
}
}