rd-net/RdFramework/Impl/InternRoot.cs (210 lines of code) (raw):
using System;
using System.Collections.Concurrent;
using System.Threading;
using JetBrains.Annotations;
using JetBrains.Collections.Viewable;
using JetBrains.Diagnostics;
using JetBrains.Lifetimes;
using JetBrains.Rd.Base;
using JetBrains.Rd.Util;
using JetBrains.Serialization;
using JetBrains.Util.Util;
#nullable disable
namespace JetBrains.Rd.Impl
{
public class InternRoot<TBase> : IInternRoot<TBase>, IRdWireable
{
private readonly ConcurrentDictionary<InternId, TBase> myDirectMap = new();
private readonly ConcurrentDictionary<TBase, IdPair> myInverseMap = new();
private int myInternedIdCounter;
private readonly CtxReadDelegate<TBase> myReadDelegate;
private readonly CtxWriteDelegate<TBase> myWriteDelegate;
public InternId TryGetInterned(TBase value)
{
var hasValue = myInverseMap.TryGetValue(value, out var pair);
if (hasValue)
return pair.Id;
return InternId.Invalid;
}
public InternId Intern(TBase value)
{
var proto = TryGetProto();
if (proto == null || !TryGetSerializationContext(out var serializationCtx))
return InternId.Invalid;
if (myInverseMap.TryGetValue(value, out var pair))
return pair.Id;
pair.Id = pair.ExtraId = InternId.Invalid;
if (myInverseMap.TryAdd(value, pair))
{
InternId allocatedId = new InternId(Interlocked.Increment(ref myInternedIdCounter) * 2);
Assertion.Assert(allocatedId.IsLocal, "Newly allocated ID must be local");
RdReactiveBase.SendTrace?.Log($"InternRoot `{Location}` ({RdId}):: {allocatedId} = {value}");
myDirectMap[allocatedId] = value;
using(proto.Contexts.CreateSendWithoutContextsCookie())
proto.Wire.Send(RdId, writer =>
{
myWriteDelegate(serializationCtx, writer, value);
InternId.Write(writer, allocatedId);
});
while (true)
{
var oldPair = myInverseMap[value];
var modifiedPair = oldPair;
modifiedPair.Id = allocatedId;
if (myInverseMap.TryUpdate(value, modifiedPair, oldPair)) break;
}
}
return myInverseMap[value].Id;
}
private object TryGetValue(InternId id)
{
myDirectMap.TryGetValue(id, out var value);
return value;
}
public bool TryUnIntern<T>(InternId id, out T result) where T : TBase
{
var value = TryGetValue(id);
if (value != null)
{
result = (T) value;
return true;
}
result = default;
return false;
}
public void Remove(TBase value)
{
if (myInverseMap.TryRemove(value, out var pair))
{
myDirectMap.TryRemove(pair.Id, out _);
myDirectMap.TryRemove(pair.ExtraId, out _);
}
}
public T UnIntern<T>(InternId id) where T : TBase
{
return (T) TryGetValue(id);
}
[CanBeNull] private IRdDynamic myParent;
public InternRoot([CanBeNull] CtxReadDelegate<TBase> readDelegate = null, [CanBeNull] CtxWriteDelegate<TBase> writeDelegate = null)
{
myReadDelegate = readDelegate ?? Polymorphic<TBase>.Read;
myWriteDelegate = writeDelegate ?? Polymorphic<TBase>.Write;
}
public IProtocol TryGetProto() => myParent?.TryGetProto();
public bool TryGetSerializationContext(out SerializationCtx ctx)
{
var parent = myParent;
if (parent != null)
return parent.TryGetSerializationContext(out ctx);
ctx = default;
return default;
}
public RName Location { get; private set; } = new RName("<<not bound>>");
public void Print(PrettyPrinter printer)
{
printer.Print(ToString());
printer.Print("(");
printer.Print(RdId.ToString());
printer.Print(")");
}
public override string ToString()
{
return GetType().ToString(false, true) + ": `" + Location + "`";
}
public RdId RdId { get; set; }
public bool IsBound => myParent != null;
public void PreBind(Lifetime lf, IRdDynamic parent, string name)
{
if (myParent != null)
{
Assertion.Fail($"Trying to bound already bound {this} to {parent.Location}");
}
//todo uncomment when fix InterningTest
//Assertion.Require(RdId != RdId.Nil, "Must be identified first");
var proto = parent.TryGetProto();
if (proto == null)
return;
lf.TryBracket(() =>
{
myParent = parent;
Location = parent.Location.Sub(name);
myDirectMap.Clear();
myInverseMap.Clear();
},
() =>
{
Location = Location.Sub("<<unbound>>", "::");
myParent = null;
RdId = RdId.Nil;
}
);
proto.Wire.Advise(lf, this);
}
public void Bind()
{
}
public void Identify(IIdentities identities, RdId id)
{
Assertion.Require(RdId.IsNil, "Already has RdId: {0}, entity: {1}", RdId, this);
Assertion.Require(!id.IsNil, "Assigned RdId mustn't be null, entity: {0}", this);
RdId = id;
}
public bool Async
{
get => true;
set => throw new NotSupportedException("Intern Roots are always async");
}
public void OnWireReceived(UnsafeReader reader, IRdWireableDispatchHelper dispatchHelper)
{
if (!TryGetSerializationContext(out var serializationCtx))
{
RdReactiveBase.ourLogReceived.Trace($"{this} is not bound. Message for ({dispatchHelper.RdId}) will not be processed");
return;
}
var value = myReadDelegate(serializationCtx, reader);
var id = InternId.Read(reader);
Assertion.Require(!id.IsLocal, "Other side sent us id of our own side?");
Assertion.Require(id.IsValid, "Other side sent us invalid id?");
RdReactiveBase.ReceiveTrace?.Log($"InternRoot `{Location}` ({RdId}):: {id} = {value}");
myDirectMap[id] = value;
var pair = new IdPair { Id = id, ExtraId = InternId.Invalid };
if (!myInverseMap.TryAdd(value, pair))
{
while (true)
{
var oldPair = myInverseMap[value];
Assertion.Assert(!oldPair.ExtraId.IsValid, "Remote send duplicated IDs for value {0}", value);
var modifiedPair = oldPair;
modifiedPair.ExtraId = id;
if (myInverseMap.TryUpdate(value, modifiedPair, oldPair)) break;
}
}
}
private struct IdPair : IEquatable<IdPair>
{
public InternId Id;
public InternId ExtraId;
public bool Equals(IdPair other)
{
return Id.Equals(other.Id) && ExtraId.Equals(other.ExtraId);
}
public override bool Equals(object obj)
{
return obj is IdPair other && Equals(other);
}
public override int GetHashCode()
{
unchecked
{
return (Id.GetHashCode() * 397) ^ ExtraId.GetHashCode();
}
}
public static bool operator ==(IdPair left, IdPair right)
{
return left.Equals(right);
}
public static bool operator !=(IdPair left, IdPair right)
{
return !left.Equals(right);
}
}
}
}