using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading; using JetBrains.Collections; using JetBrains.Collections.Viewable; using JetBrains.Diagnostics; using JetBrains.Lifetimes; using JetBrains.Rd.Base; using JetBrains.Serialization; using JetBrains.Util; namespace JetBrains.Rd.Impl { /// /// This class handles RdContext on protocol level. It tracks existing contexts and allows access to their value sets (when present) /// public class ProtocolContexts : RdReactiveBase { private readonly CopyOnWriteList myCounterpartHandlers = new(); private readonly CopyOnWriteList myHandlersToWrite = new(); private readonly IViewableList myHandlerOrder = new ViewableList(); private readonly ConcurrentDictionary myHandlersMap = new(); private readonly object myOrderingLock = new(); private readonly ThreadLocal mySendWithoutContexts = new(() => false); internal readonly struct SendWithoutContextsCookie : IDisposable { private readonly ProtocolContexts myContexts; private readonly bool myPrevValue; public SendWithoutContextsCookie(ProtocolContexts contexts) { myContexts = contexts; myPrevValue = contexts.mySendWithoutContexts.Value; contexts.mySendWithoutContexts.Value = true; } public void Dispose() { myContexts.mySendWithoutContexts.Value = myPrevValue; } } private readonly SerializationCtx mySerializationCtx; internal SendWithoutContextsCookie CreateSendWithoutContextsCookie() => new SendWithoutContextsCookie(this); public bool IsSendWithoutContexts => mySendWithoutContexts.Value; public ProtocolContexts(SerializationCtx serializationCtx) { Async = true; mySerializationCtx = serializationCtx; } public ICollection RegisteredContexts => myHandlersMap.Keys; internal ISingleContextHandler GetHandlerForContext(RdContext context) { return (ISingleContextHandler) myHandlersMap[context]; } public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader reader, IRdWireableDispatchHelper dispatchHelper) { var contextBase = RdContextBase.Read(mySerializationCtx, reader); contextBase.RegisterOn(this); myCounterpartHandlers.Add(myHandlersMap[contextBase]); } private void DoAddHandler(RdContext context, ISingleContextHandler handler) { if (myHandlersMap.TryAdd(context, handler)) { context.RegisterOn(mySerializationCtx.Serializers); lock (myOrderingLock) myHandlerOrder.Add(handler); } } private void PreBindHandler(Lifetime lifetime, string key, ISingleContextHandler handler) { if (handler is RdBindableBase bindableHandler) { bindableHandler.RdId = RdId.Mix(key); bindableHandler.PreBind(lifetime, this, key); } } private void BindHandler(ISingleContextHandler handler) { if (handler is RdBindableBase bindableHandler) { using (CreateSendWithoutContextsCookie()) bindableHandler.Bind(); } } private void SendContextToRemote(RdContextBase context) { var wire = TryGetProto()?.Wire; if (wire == null) return; using(CreateSendWithoutContextsCookie()) wire.Send(RdId, writer => { RdContextBase.Write(mySerializationCtx, writer, context); }); } private void EnsureHeavyHandlerExists(RdContext context) { if (Mode.IsAssertion) Assertion.Assert(context.IsHeavy, "key.IsHeavy"); if (!myHandlersMap.ContainsKey(context)) DoAddHandler(context, new HeavySingleContextHandler(context, this)); } private void EnsureLightHandlerExists(RdContext context) { if (Mode.IsAssertion) Assertion.Assert(!context.IsHeavy, "!key.IsHeavy"); if (!myHandlersMap.ContainsKey(context)) DoAddHandler(context, new LightSingleContextHandler(context)); } /// /// Get a value set for a given key. The values are local relative to transform /// public IAppendOnlyViewableConcurrentSet GetValueSet(RdContext context) where T : notnull { if (Mode.IsAssertion) Assertion.Assert(context.IsHeavy, "Only heavy keys have value sets, key {0} is light", context.Key); return ((HeavySingleContextHandler) GetHandlerForContext(context)).LocalValueSet; } /// /// Registers a context to be used with this context handler. Must be invoked on protocol's scheduler /// public void RegisterContext(RdContext context) { if (myHandlersMap.ContainsKey(context)) return; if(context.IsHeavy) EnsureHeavyHandlerExists(context); else EnsureLightHandlerExists(context); } protected override void PreInit(Lifetime lifetime, IProtocol proto) { base.PreInit(lifetime, proto); lock (myOrderingLock) { myHandlerOrder.View(lifetime, (handlerLt, _, handler) => { PreBindHandler(handlerLt, handler.ContextBase.Key, handler); }); } proto.Wire.Advise(lifetime, this); } protected override void Init(Lifetime lifetime, IProtocol proto, SerializationCtx ctx) { base.Init(lifetime, proto, ctx); lock (myOrderingLock) { myHandlerOrder.View(lifetime, (handlerLt, _, handler) => { BindAndSendHandler(handler); }); } } /// /// Reads context values from a message, sets current context to them, and returns a cookie to restore previous context /// internal MessageContext ReadContextsIntoCookie(UnsafeReader reader) { var numContextValues = reader.ReadShort(); if (numContextValues == 0) return default; var handlers = myCounterpartHandlers; if (Mode.IsAssertion) Assertion.Assert(numContextValues <= handlers.Count, "We know of {0} other side keys, received {1} instead", handlers.Count, numContextValues); var values = new object[numContextValues]; for (var i = 0; i < numContextValues; i++) values[i] = handlers[i].ReadValueBoxed(mySerializationCtx, reader); return new MessageContext(values, handlers.GetStorageUnsafe()); } internal readonly ref struct MessageContextCookie { private readonly IDisposable[] myDisposables; public MessageContextCookie(IDisposable[] disposables) { myDisposables = disposables; } public void Dispose() { if (myDisposables is { } disposables) { foreach (var disposable in disposables) disposable.Dispose(); } } } internal readonly struct MessageContext { private readonly object[] myValues; private readonly ISingleContextHandler[] myHandlers; public MessageContext(object[] values, ISingleContextHandler[] handlers) { myValues = values; myHandlers = handlers; } public MessageContextCookie UpdateCookie() { if (myHandlers == null) return default; var disposables = new IDisposable[myValues.Length]; for (var i = 0; i < myValues.Length; i++) disposables[i] = myHandlers[i].ContextBase.UpdateValueBoxed(myValues[i]); return new MessageContextCookie(disposables); } } /// /// Writes the current context values /// [SuppressMessage("ReSharper", "InconsistentlySynchronizedField", Justification = "sync is for atomicity of write/send pairs, not access")] public void WriteContexts(UnsafeWriter writer) { if (IsSendWithoutContexts) { WriteEmptyContexts(writer); return; } // all handlers in myHandlersToWrite have been sent to the remote side var count = myHandlersToWrite.Count; writer.WriteInt16((short) count); for (var i = 0; i < count; i++) myHandlersToWrite[i].WriteValue(mySerializationCtx, writer); } /// /// Adds current values of registered contexts to their respective value sets without writing them to the wire /// [SuppressMessage("ReSharper", "InconsistentlySynchronizedField", Justification = "sync is for atomicity of write/send pairs, not access")] public void RegisterCurrentValuesInValueSets() { var count = myHandlerOrder.Count; for (var i = 0; i < count; i++) myHandlerOrder[i].RegisterValueInValueSet(); } /// /// Writes an empty context /// public static void WriteEmptyContexts(UnsafeWriter writer) { writer.WriteInt16((short) 0); } private void BindAndSendHandler(ISingleContextHandler handler) { SendContextToRemote(handler.ContextBase); BindHandler(handler); // add the handler to myHandlersToWrite only after sending the context to remote myHandlersToWrite.Add(handler); } } }