rd-net/RdFramework/Impl/AsyncProperty.cs (274 lines of code) (raw):

using System; using System.Collections.Generic; using System.ComponentModel; using System.Threading; using JetBrains.Annotations; using JetBrains.Collections.Viewable; using JetBrains.Core; using JetBrains.Diagnostics; using JetBrains.Lifetimes; using JetBrains.Rd.Base; using JetBrains.Rd.Util; using JetBrains.Serialization; namespace JetBrains.Rd.Impl; public class AsyncRdProperty<T> : IRdReactive, IAsyncProperty<T>, INotifyPropertyChanged, ITerminationHandler { #region Constructor public event PropertyChangedEventHandler? PropertyChanged; public AsyncRdProperty(CtxReadDelegate<T> readValue, CtxWriteDelegate<T> writeValue) { ReadValueDelegate = readValue; WriteValueDelegate = writeValue; myProperty = new ViewableProperty<T>(); myProperty.Advise(Lifetime.Eternal, value => { PropertyChanged?.Invoke(this, new PropertyChangedEventArgs("Value")); myChange.Fire(value); // todo }); } [PublicAPI] public AsyncRdProperty(CtxReadDelegate<T> readValue, CtxWriteDelegate<T> writeValue, T defaultValue) : this(readValue, writeValue) { myProperty.Value = defaultValue; } #endregion #region Serializers public CtxReadDelegate<T> ReadValueDelegate { get; private set; } public CtxWriteDelegate<T> WriteValueDelegate { get; private set; } public static AsyncRdProperty<T> Read(SerializationCtx ctx, UnsafeReader reader) { return Read(ctx, reader, Polymorphic<T>.Read, Polymorphic<T>.Write); } public static AsyncRdProperty<T> Read(SerializationCtx ctx, UnsafeReader reader, CtxReadDelegate<T> readValue, CtxWriteDelegate<T> writeValue) { var id = reader.ReadRdId(); var res = new AsyncRdProperty<T>(readValue, writeValue).WithId(id); if (reader.ReadBool()) { res.myProperty.Value = readValue(ctx, reader); } return res; } public static void Write(SerializationCtx ctx, UnsafeWriter writer, AsyncRdProperty<T> value) { value.Write(ctx, writer); } private void Write(SerializationCtx ctx, UnsafeWriter writer) { var bookmark = writer.MakeBookmark(); Maybe<T> maybe; lock (myProperty) maybe = myProperty.Maybe; while (true) { lock (myProperty) { writer.Write(RdId); } if (maybe.HasValue) { writer.WriteBoolean(true); WriteValueDelegate(ctx, writer, myProperty.Value); } else { writer.WriteBoolean(false); } lock (myProperty) { var currentMaybe = myProperty.Maybe; if (maybe.HasValue) { var currentValue = currentMaybe.Value; if (EqualityComparer<T>.Default.Equals(maybe.Value, currentValue)) return; } else { if (!currentMaybe.HasValue) return; } maybe = currentMaybe; } bookmark.Reset(); } } #endregion #region Mastering public bool IsMaster = false; private int myMasterVersion; #endregion #region Init public bool OptimizeNested { get => true; set { } } public bool Async { get => true; set { } } public RdId RdId { get; set; } public RName Location { get; private set; } = RName.Empty; private IRdDynamic? myParent; private Lifetime myBindLifetime = Lifetime.Terminated; private readonly ThreadLocal<bool> myIsLocalChange = new(); public IProtocol? TryGetProto() => myParent?.TryGetProto(); public bool TryGetSerializationContext(out SerializationCtx ctx) { if (myParent is { } parent) return parent.TryGetSerializationContext(out ctx); ctx = default; return false; } public IAsyncSource<T> Change => myChange; public void Identify(IIdentities identities, RdId id) { Assertion.Require(!id.IsNil, $"Assigned RdId mustn't be null, entity: {this}"); lock (myProperty) { Assertion.Require(RdId.IsNil, $"Already has RdId: {RdId}, entity: {this}"); RdId = id; } } public void OnTermination(Lifetime lifetime) { lock (myProperty) { Location = Location.Sub("<<unbound>>", "::"); RdId = RdId.Nil; myParent = null; // BindState = BindState.NotBound; } } public void PreBind(Lifetime lifetime, IRdDynamic parent, string name) { var proto = parent.TryGetProto(); if (proto == null) return; lock (myProperty) { using var cookie = lifetime.UsingExecuteIfAlive(); if (!cookie.Succeed) return; myParent = parent; Location = parent.Location.Sub(name); myBindLifetime = lifetime; lifetime.OnTermination(this); } proto.Wire.Advise(lifetime, this); } public void Bind() { lock (myProperty) { var lifetime = myBindLifetime; var proto = TryGetProto(); if (proto == null || !TryGetSerializationContext(out var ctx)) return; myProperty.Advise(lifetime, v => { if (!myIsLocalChange.Value) return; if (IsMaster) myMasterVersion++; proto.Wire.Send(RdId, SendContext.Of(ctx, v, this), static (sendContext, writer) => { var sContext = sendContext.SzrCtx; var evt = sendContext.Event; var me = sendContext.This; writer.Write(me.myMasterVersion); me.WriteValueDelegate(sContext, writer, evt); // todo don't call it under lock RdReactiveBase.SendTrace?.Log($"{me} :: ver = {me.myMasterVersion}, value = {me.Value.PrintToString()}"); }); }); } } public void OnWireReceived(UnsafeReader reader, IRdWireableDispatchHelper dispatchHelper) { if (!TryGetSerializationContext(out var ctx)) return; var version = reader.ReadInt(); var value = ReadValueDelegate(ctx, reader); lock (myProperty) { if (dispatchHelper.Lifetime.IsNotAlive) return; var rejected = IsMaster && version < myMasterVersion; RdReactiveBase.ReceiveTrace?.Log($"{this} :: oldver = {myMasterVersion}, newver = {version}, value = {value.PrintToString()}{(rejected ? " REJECTED" : "")}"); if (rejected) return; myMasterVersion = version; myProperty.Value = value; } } #endregion public bool ValueCanBeNull { get; set; } private void AssertNullability(T value) { if ( //optimization for memory traffic typeof(T).IsValueType || ValueCanBeNull || value != null) return; Assertion.Fail("Value is defined as not nullable: {0}", this); } #region Api private readonly IViewableProperty<T> myProperty; private readonly AsyncSignal<T> myChange = new(); public Maybe<T> Maybe { get { lock (myProperty) return myProperty.Maybe; } } public T Value { get { lock (myProperty) return myProperty.Value; } set { AssertNullability(value); myIsLocalChange.Value = true; try { lock (myProperty) myProperty.Value = value; } finally { myIsLocalChange.Value = false; } } } public void AdviseOn(Lifetime lifetime, IScheduler scheduler, Action<T> action) { lock (myProperty) { myProperty.Advise(lifetime, value => { scheduler.Queue(() => { action(value); }); }); } } #endregion public void Print(PrettyPrinter printer) { if (!printer.PrintContent) return; lock (myProperty) { printer.Print("(ver=" + myMasterVersion + ") ["); if (Maybe.HasValue) { using (printer.IndentCookie()) { Value.PrintEx(printer); } } else { printer.Print(" <not initialized> "); } printer.Print("]"); } } }