rd-net/RdFramework/Tasks/RdCall.cs (195 lines of code) (raw):
using System;
using System.Diagnostics;
using JetBrains.Annotations;
using JetBrains.Collections.Viewable;
using JetBrains.Diagnostics;
using JetBrains.Lifetimes;
using JetBrains.Rd.Base;
using JetBrains.Rd.Impl;
using JetBrains.Serialization;
namespace JetBrains.Rd.Tasks
{
public class RdCall<TReq, TRes> : RdReactiveBase, IRdCall<TReq, TRes>, IRdEndpoint<TReq, TRes>
{
[PublicAPI]
public CtxReadDelegate<TReq> ReadRequestDelegate { get; }
[PublicAPI]
public CtxWriteDelegate<TReq> WriteRequestDelegate { get; }
[PublicAPI]
public CtxReadDelegate<TRes> ReadResponseDelegate { get; }
[PublicAPI]
public CtxWriteDelegate<TRes> WriteResponseDelegate { get; }
//set via Set method
[PublicAPI] public Func<Lifetime, TReq, RdTask<TRes>>? Handler { get; private set; }
private Lifetime myBindLifetime;
private IScheduler? myCancellationScheduler;
private IScheduler? myHandlerScheduler;
public RdCall(CtxReadDelegate<TReq> readRequest, CtxWriteDelegate<TReq> writeRequest, CtxReadDelegate<TRes> readResponse, CtxWriteDelegate<TRes> writeResponse)
{
ReadRequestDelegate = readRequest;
WriteRequestDelegate = writeRequest;
ReadResponseDelegate = readResponse;
WriteResponseDelegate = writeResponse;
myBindLifetime = Lifetime.Terminated;
}
protected override void PreInit(Lifetime lifetime, IProtocol proto)
{
base.PreInit(lifetime, proto);
Assertion.Assert(myBindLifetime.Status == LifetimeStatus.Terminated);
myBindLifetime = lifetime;
proto.Wire.Advise(lifetime, this);
}
public void SetRdTask(Func<Lifetime, TReq, RdTask<TRes>> handler, IScheduler? cancellationScheduler = null, IScheduler? handlerScheduler = null)
{
Handler = handler;
myCancellationScheduler = cancellationScheduler;
myHandlerScheduler = handlerScheduler;
}
[Obsolete("This is an internal API. It is preferable to use SetSync or SetAsync extension methods")]
public void Set(Func<Lifetime, TReq, RdTask<TRes>> handler, IScheduler? cancellationScheduler = null, IScheduler? handlerScheduler = null)
{
SetRdTask(handler, cancellationScheduler, handlerScheduler);
}
[PublicAPI]
public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader reader, IRdWireableDispatchHelper dispatchHelper)
{
var taskId = RdId.Read(reader);
var wiredTask = new WiredRdTask<TReq, TRes>.Endpoint(dispatchHelper.Lifetime, this, taskId, myCancellationScheduler ?? SynchronousScheduler.Instance);
try
{
OnWireReceived(ctx, reader, wiredTask, dispatchHelper);
}
catch (Exception e)
{
wiredTask.Set(e);
}
}
private void OnWireReceived(SerializationCtx ctx, UnsafeReader reader, WiredRdTask<TReq, TRes>.Endpoint wiredTask, IRdWireableDispatchHelper dispatchHelper)
{
var externalCancellation = wiredTask.Lifetime;
var value = ReadRequestDelegate(ctx, reader);
ReceiveTrace?.Log($"OnWireReceived:: {wiredTask} :: received request: {value.PrintToString()}");
dispatchHelper.Dispatch(myHandlerScheduler, () =>
{
ReceiveTrace?.Log($"Dispatched:: {wiredTask} :: received request: {value.PrintToString()}");
var rdTask = RunHandler(value, externalCancellation, wiredTask);
rdTask.Result.Advise(Lifetime.Eternal, result =>
{
try
{
if (result.Status == RdTaskStatus.Success)
AssertNullability(result.Result);
wiredTask.ResultInternal.SetIfEmpty(result);
}
catch (Exception ee)
{
ourLogSend.Error($"Problem when responding to `{wiredTask}`", ee);
wiredTask.Set(new RdFault(ee));
}
});
});
}
private RdTask<TRes> RunHandler(TReq value, Lifetime externalCancellation, object? moniker)
{
RdTask<TRes> rdTask;
try
{
var handler = Handler;
if (handler == null)
{
var message = $"Handler is not set for {moniker} :: received request: {value.PrintToString()}";
ourLogReceived.Error(message);
rdTask = RdTask.Faulted<TRes>(new Exception(message));
}
else
{
try
{
rdTask = handler(externalCancellation, value);
}
catch (Exception ex)
{
rdTask = RdTask.Faulted<TRes>(ex);
}
}
}
catch (Exception e)
{
rdTask = RdTask.Faulted<TRes>(new Exception($"Unexpected exception in {moniker}", e));
}
return rdTask;
}
public TRes Sync(TReq request, RpcTimeouts? timeouts = null)
{
AssertBound();
if (!Async) AssertThreading();
var task = StartInternal(Lifetime.Eternal, request, SynchronousScheduler.Instance);
var stopwatch = new Stopwatch();
stopwatch.Start();
var timeoutsToUse = RpcTimeouts.GetRpcTimeouts(timeouts);
if (!task.Wait(timeoutsToUse.WarnAwaitTime))
{
var deltaAwaitTime = timeoutsToUse.ErrorAwaitTime - timeoutsToUse.WarnAwaitTime;
var res = deltaAwaitTime > TimeSpan.Zero && task.Wait(deltaAwaitTime);
stopwatch.Stop();
if (!res)
throw new TimeoutException($"Sync execution of rpc `{Location}` is timed out in {timeoutsToUse.ErrorAwaitTime.TotalMilliseconds} ms, the freeze time is {stopwatch.ElapsedMilliseconds} ms");
Log.Root.Error("Sync execution of rpc `{0}` executed too long: {1} ms, the freeze time: {2} ms", Location, timeoutsToUse.WarnAwaitTime.TotalMilliseconds, stopwatch.ElapsedMilliseconds);
}
return task.Result.Value.Unwrap();
}
public IRdTask<TRes> Start(TReq request, IScheduler? responseScheduler = null)
=> StartInternal(Lifetime.Eternal, request, responseScheduler);
public IRdTask<TRes> Start(Lifetime lifetime, TReq request, IScheduler? responseScheduler = null)
=> StartInternal(lifetime, request, responseScheduler);
private IRdTask<TRes> StartInternal(Lifetime requestLifetime, TReq request, IScheduler? scheduler)
{
var proto = TryGetProto();
if (!Async)
AssertBound();
AssertThreading();
AssertNullability(request);
if (proto == null || !TryGetSerializationContext(out var serializationContext))
return new WiredRdTask<TReq, TRes>.CallSite(Lifetime.Terminated, this, RdId.Nil, SynchronousScheduler.Instance);
// Short-circuit of calls on local wires. On a local protocol with stub wire the handler will
// never be executed, so we call it right now explicitly in sync mode.
if (proto.Wire.IsStub)
return RunHandler(request, requestLifetime, moniker: this);
var taskId = proto.Identities.Next(RdId.Nil);
var task = CreateCallSite(requestLifetime, (lifetime) => new WiredRdTask<TReq, TRes>.CallSite(lifetime, this, taskId, scheduler ?? proto.Scheduler));
using var cookie = task.Lifetime.UsingExecuteIfAlive();
if (cookie.Succeed)
{
proto.Wire.Send(RdId, (writer) =>
{
SendTrace?.Log($"{task} :: send request: {request.PrintToString()}");
taskId.Write(writer);
WriteRequestDelegate(serializationContext, writer, request);
});
}
return task;
}
private WiredRdTask<TReq, TRes>.CallSite CreateCallSite(Lifetime requestLifetime, Func<Lifetime, WiredRdTask<TReq, TRes>.CallSite> createTask)
{
if (requestLifetime.IsEternal)
return createTask(myBindLifetime);
var intersectedDef = Lifetime.DefineIntersection(requestLifetime, myBindLifetime);
var task = createTask(intersectedDef.Lifetime);
task.Result.Advise(intersectedDef.Lifetime, result =>
{
if (result.Status != RdTaskStatus.Success || !result.Result.IsBindable())
{
intersectedDef.AllowTerminationUnderExecution = true;
intersectedDef.Terminate();
}
});
return task;
}
public static RdCall<TReq, TRes> Read(SerializationCtx ctx, UnsafeReader reader, CtxReadDelegate<TReq> readRequest, CtxWriteDelegate<TReq> writeRequest, CtxReadDelegate<TRes> readResponse, CtxWriteDelegate<TRes> writeResponse)
{
return new RdCall<TReq, TRes>(readRequest, writeRequest, readResponse, writeResponse).WithId(reader.ReadRdId());
}
public static void Write(SerializationCtx ctx, UnsafeWriter writer, RdCall<TReq, TRes> value)
{
value.RdId.Write(writer);
}
protected override string ShortName => Handler == null ? "call" : "endpoint";
}
}