rd-net/RdFramework/Impl/RdMap.cs (403 lines of code) (raw):
using System;
using System.Collections;
using System.Collections.Generic;
using JetBrains.Annotations;
using JetBrains.Collections;
using JetBrains.Collections.Synchronized;
using JetBrains.Collections.Viewable;
using JetBrains.Diagnostics;
using JetBrains.Lifetimes;
using JetBrains.Rd.Base;
using JetBrains.Rd.Util;
using JetBrains.Serialization;
// ReSharper disable InconsistentNaming
namespace JetBrains.Rd.Impl
{
public class RdMap<K, V> : RdReactiveBase, IViewableMap<K, V> where K : notnull
{
private readonly ViewableMap<K, V> myMap = new(new SynchronizedDictionary<K, V>()/*to have thread safe print*/);
public RdMap(CtxReadDelegate<K> readKey, CtxWriteDelegate<K> writeKey, CtxReadDelegate<V> readValue, CtxWriteDelegate<V> writeValue)
{
ValueCanBeNull = false;
ReadKeyDelegate = readKey;
WriteKeyDelegate = writeKey;
ReadValueDelegate = readValue;
WriteValueDelegate = writeValue;
}
#region Serializers
[PublicAPI]
public CtxReadDelegate<K> ReadKeyDelegate { get; private set; }
[PublicAPI]
public CtxWriteDelegate<K> WriteKeyDelegate { get; private set; }
[PublicAPI]
public CtxReadDelegate<V> ReadValueDelegate { get; private set; }
[PublicAPI]
public CtxWriteDelegate<V> WriteValueDelegate { get; private set; }
[PublicAPI]
public static RdMap<K, V> Read(SerializationCtx ctx, UnsafeReader reader)
{
return Read(ctx, reader, Polymorphic<K>.Read, Polymorphic<K>.Write, Polymorphic<V>.Read, Polymorphic<V>.Write);
}
[PublicAPI]
public static RdMap<K,V> Read(SerializationCtx ctx, UnsafeReader reader, CtxReadDelegate<K> readKey, CtxWriteDelegate<K> writeKey, CtxReadDelegate<V> readValue, CtxWriteDelegate<V> writeValue)
{
var id = reader.ReadRdId();
return new RdMap<K, V>(readKey, writeKey, readValue, writeValue).WithId(id);
}
[PublicAPI]
public static void Write(SerializationCtx ctx, UnsafeWriter writer, RdMap<K, V> value)
{
Assertion.Assert(!value.RdId.IsNil);
writer.Write(value.RdId);
}
#endregion
#region Versions
private const int versionedFlagShift = 8;
private const int Ack = (int)AddUpdateRemove.Remove + 1;
public bool IsMaster = false;
private long myNextVersion;
private readonly Dictionary<K, long> myPendingForAck = new Dictionary<K, long>();
#endregion
#region Init
public bool OptimizeNested { [PublicAPI] get; set; }
private volatile SynchronizedDictionary<K, LifetimeDefinition?>? myBindDefinitions;
protected override void PreInit(Lifetime lifetime, IProtocol proto)
{
base.PreInit(lifetime, proto);
if (!OptimizeNested)
{
var definitions = new SynchronizedDictionary<K, LifetimeDefinition?>(myMap.Count);
foreach (var (key, value) in this)
{
if (value != null)
{
value.IdentifyPolymorphic(proto.Identities, proto.Identities.Next(RdId));
var definition = TryPreBindValue(lifetime, key, value, false);
if (definition != null)
definitions.Add(key, definition);
}
}
using var cookie = lifetime.UsingExecuteIfAlive();
if (cookie.Succeed)
{
Assertion.Assert(myBindDefinitions == null);
myBindDefinitions = definitions;
}
else
return;
}
proto.Wire.Advise(lifetime, this);
}
protected override void Init(Lifetime lifetime, IProtocol proto, SerializationCtx ctx)
{
base.Init(lifetime, proto, ctx);
if (!OptimizeNested)
{
Change.Advise(lifetime, it =>
{
AssertNullability(it.Key);
if (it.Kind != AddUpdateRemove.Remove) AssertNullability(it.NewValue);
if (IsLocalChange)
{
var definitions = TryGetBindDefinitions(lifetime);
if (definitions == null)
return;
if (it.Kind != AddUpdateRemove.Add)
definitions[it.Key]?.Terminate();
if (it.Kind == AddUpdateRemove.Remove)
definitions.Remove(it.Key);
if (it.Kind != AddUpdateRemove.Remove)
{
it.NewValue.IdentifyPolymorphic(proto.Identities, proto.Identities.Next(RdId));
var definition = TryPreBindValue(lifetime, it.Key, it.NewValue, false);
definitions[it.Key] = definition;
}
}
});
}
using (UsingLocalChange())
{
Advise(lifetime, it =>
{
if (!IsLocalChange) return;
proto.Wire.Send(RdId, SendContext.Of(ctx, it, this), static (sendContext, stream) =>
{
var sContext = sendContext.SzrCtx;
var evt = sendContext.Event;
var me = sendContext.This;
var versionedFlag = me.IsMaster ? 1 << versionedFlagShift : 0;
stream.WriteInt32(versionedFlag | (int)evt.Kind);
var version = ++me.myNextVersion;
if (me.IsMaster)
{
lock(me.myPendingForAck)
me.myPendingForAck[evt.Key] = version;
stream.WriteInt64(version);
}
me.WriteKeyDelegate(sContext, stream, evt.Key);
if (evt.IsUpdate || evt.IsAdd)
{
me.WriteValueDelegate(sContext, stream, evt.NewValue);
}
SendTrace?.Log($"{me} :: {evt.Kind} :: key = {evt.Key.PrintToString()}"
+ (me.IsMaster ? " :: version = " + version : "")
+ (evt.Kind != AddUpdateRemove.Remove ? " :: value = " + evt.NewValue.PrintToString() : ""));
});
if (!OptimizeNested)
it.NewValue.BindPolymorphic();
});
}
}
protected override void Unbind()
{
base.Unbind();
myBindDefinitions = null;
}
private static string getMessage(bool msgVersioned, long version, bool isPut, V? value)
{
return "{this} :: {kind} :: key = {key.PrintToString()}" +
(msgVersioned ? " :: version = " + version : "") +
(isPut ? $" :: value = {value.PrintToString()}" : "");
}
public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader stream, IRdWireableDispatchHelper dispatchHelper)
{
var header = stream.ReadInt();
var msgVersioned = (header >> versionedFlagShift) != 0;
var opType = header & ((1 << versionedFlagShift) - 1);
var version = msgVersioned ? stream.ReadLong() : 0L;
var key = ReadKeyDelegate(ctx, stream);
var lifetime = dispatchHelper.Lifetime;
if (opType == Ack)
{
dispatchHelper.Dispatch(() =>
{
lock (myPendingForAck)
{
string? error = null;
if (!msgVersioned) error = "Received ACK while msg hasn't versioned flag set";
else if (!IsMaster) error = "Received ACK when not a Master";
else if (!myPendingForAck.TryGetValue(key, out var pendingVersion)) error = "No pending for ACK";
else if (pendingVersion < version)
error = $"Pending version `{pendingVersion}` < ACK version `{version}`";
// Good scenario
else if (pendingVersion == version)
myPendingForAck.Remove(key);
//else do nothing, silently drop
var isError = !string.IsNullOrEmpty(error);
if (ourLogReceived.IsTraceEnabled() || isError)
{
ourLogReceived.LogFormat(isError ? LoggingLevel.ERROR : LoggingLevel.TRACE,
"{0} :: ACK :: key = {1} :: version = {2}{3}", this, key.PrintToString(), version,
isError ? " >> " + error : "");
}
}
});
}
else
{
var kind = (AddUpdateRemove)opType;
var isPut = kind is AddUpdateRemove.Add or AddUpdateRemove.Update;
var value = isPut ? ReadValueDelegate(ctx, stream) : default;
var definition = TryPreBindValue(lifetime, key, value, true);
ReceiveTrace?.Log($"OnWireReceived:: {getMessage(msgVersioned, version, isPut, value)}");
dispatchHelper.Dispatch(() =>
{
if (msgVersioned || !IsMaster || !IsPendingForAck(key))
{
ReceiveTrace?.Log($"Dispatched:: {getMessage(msgVersioned, version, isPut, value)}");
if (isPut)
{
if (TryGetBindDefinitions(lifetime) is { } definitions)
{
if (kind == AddUpdateRemove.Update)
definitions[key]?.Terminate();
definitions[key] = definition;
}
myMap[key] = value!;
}
else
{
if (TryGetBindDefinitions(lifetime) is { } definitions && definitions.TryGetValue(key, out var prevDefinition))
{
prevDefinition?.Terminate();
definitions.Remove(key);
}
myMap.Remove(key);
}
}
else
{
ReceiveTrace?.Log($">> CHANGE IGNORED {getMessage(msgVersioned, version, isPut, value)}");
}
if (msgVersioned)
{
proto.Wire.Send(RdId, innerWriter =>
{
innerWriter.WriteInt32((1 << versionedFlagShift) | Ack);
innerWriter.WriteInt64(version);
WriteKeyDelegate.Invoke(ctx, innerWriter, key);
SendTrace?.Log($"{this} :: ACK :: key = {key.PrintToString()} :: version = {version}");
});
if (IsMaster)
ourLogReceived.Error("Both ends are masters: {0}", Location);
}
});
}
}
private SynchronizedDictionary<K, LifetimeDefinition?>? TryGetBindDefinitions(Lifetime lifetime)
{
var definitions = myBindDefinitions;
return lifetime.IsAlive ? definitions : null;
}
private bool IsPendingForAck(K key)
{
lock (myPendingForAck)
return myPendingForAck.ContainsKey(key);
}
private LifetimeDefinition? TryPreBindValue(Lifetime lifetime, K key, V? value, bool bindAlso)
{
if (OptimizeNested || !value.IsBindable())
return null;
var definition = new LifetimeDefinition { Id = value };
try
{
value.PreBindPolymorphic(definition.Lifetime, this, "["+key+"]");
if (bindAlso)
value.BindPolymorphic();
lifetime.Definition.Attach(definition, true);
return definition;
}
catch
{
definition.Terminate();
throw;
}
}
#endregion
#region Read delegation
IEnumerator IEnumerable.GetEnumerator()
{
return myMap.GetEnumerator();
}
public IEnumerator<KeyValuePair<K, V>> GetEnumerator()
{
return myMap.GetEnumerator();
}
public bool Contains(KeyValuePair<K, V> item)
{
return myMap.Contains(item);
}
public void CopyTo(KeyValuePair<K, V>[] array, int arrayIndex)
{
myMap.CopyTo(array, arrayIndex);
}
public int Count => myMap.Count;
public bool IsReadOnly => myMap.IsReadOnly;
public bool ContainsKey(K key)
{
return myMap.ContainsKey(key);
}
public bool TryGetValue(K key, out V value)
{
return myMap.TryGetValue(key, out value);
}
public ICollection<K> Keys => myMap.Keys;
public ICollection<V> Values => myMap.Values;
public ISource<MapEvent<K, V>> Change => myMap.Change;
#endregion
#region Write delegation
//todo async!
public void Add(K key, V value)
{
AssertNullability(value);
using (UsingLocalChange())
{
myMap.Add(key, value);
}
}
public bool Remove(K key)
{
using (UsingLocalChange())
{
return myMap.Remove(key);
}
}
public V this[K key]
{
get => myMap[key];
set
{
AssertNullability(value);
using (UsingLocalChange())
{
myMap[key] = value;
}
}
}
public bool Remove(KeyValuePair<K, V> item)
{
using (UsingLocalChange())
{
return myMap.Remove(item);
}
}
public void Add(KeyValuePair<K, V> item)
{
AssertNullability(item.Value);
using (UsingLocalChange())
{
myMap.Add(item);
}
}
public void Clear()
{
using (UsingLocalChange())
{
myMap.Clear();
}
}
#endregion
public void Advise(Lifetime lifetime, Action<MapEvent<K, V>> handler)
{
if (IsBound) AssertThreading();
using (UsingDebugInfo())
{
myMap.Advise(lifetime, handler);
}
}
public override RdBindableBase? FindByRName(RName rName)
{
var rootName = rName.GetNonEmptyRoot();
var localName = rootName.LocalName.ToString();
if (!localName.StartsWith("[") || !localName.EndsWith("]"))
return null;
var stringKey = localName.Substring(1, localName.Length - 2);
foreach (var (key, value) in myMap)
{
if (key.ToString() != stringKey)
continue;
if (!(value is RdBindableBase bindableValue))
break;
if (rootName == rName)
return bindableValue;
return bindableValue.FindByRName(rName.DropNonEmptyRoot());
}
return null;
}
protected override string ShortName => "map";
public override void Print(PrettyPrinter printer)
{
base.Print(printer);
if (!printer.PrintContent) return;
printer.Print(" [");
if (Count > 0) printer.Println();
using (printer.IndentCookie())
{
foreach (var kv in this)
{
kv.Key.PrintEx(printer);
printer.Print(" => ");
kv.Value.PrintEx(printer);
printer.Println();
}
}
printer.Println("]");
}
}
}