in rd-net/RdFramework/Impl/RdMap.cs [203:299]
public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader stream, IRdWireableDispatchHelper dispatchHelper)
{
var header = stream.ReadInt();
var msgVersioned = (header >> versionedFlagShift) != 0;
var opType = header & ((1 << versionedFlagShift) - 1);
var version = msgVersioned ? stream.ReadLong() : 0L;
var key = ReadKeyDelegate(ctx, stream);
var lifetime = dispatchHelper.Lifetime;
if (opType == Ack)
{
dispatchHelper.Dispatch(() =>
{
lock (myPendingForAck)
{
string? error = null;
if (!msgVersioned) error = "Received ACK while msg hasn't versioned flag set";
else if (!IsMaster) error = "Received ACK when not a Master";
else if (!myPendingForAck.TryGetValue(key, out var pendingVersion)) error = "No pending for ACK";
else if (pendingVersion < version)
error = $"Pending version `{pendingVersion}` < ACK version `{version}`";
// Good scenario
else if (pendingVersion == version)
myPendingForAck.Remove(key);
//else do nothing, silently drop
var isError = !string.IsNullOrEmpty(error);
if (ourLogReceived.IsTraceEnabled() || isError)
{
ourLogReceived.LogFormat(isError ? LoggingLevel.ERROR : LoggingLevel.TRACE,
"{0} :: ACK :: key = {1} :: version = {2}{3}", this, key.PrintToString(), version,
isError ? " >> " + error : "");
}
}
});
}
else
{
var kind = (AddUpdateRemove)opType;
var isPut = kind is AddUpdateRemove.Add or AddUpdateRemove.Update;
var value = isPut ? ReadValueDelegate(ctx, stream) : default;
var definition = TryPreBindValue(lifetime, key, value, true);
ReceiveTrace?.Log($"OnWireReceived:: {getMessage(msgVersioned, version, isPut, value)}");
dispatchHelper.Dispatch(() =>
{
if (msgVersioned || !IsMaster || !IsPendingForAck(key))
{
ReceiveTrace?.Log($"Dispatched:: {getMessage(msgVersioned, version, isPut, value)}");
if (isPut)
{
if (TryGetBindDefinitions(lifetime) is { } definitions)
{
if (kind == AddUpdateRemove.Update)
definitions[key]?.Terminate();
definitions[key] = definition;
}
myMap[key] = value!;
}
else
{
if (TryGetBindDefinitions(lifetime) is { } definitions && definitions.TryGetValue(key, out var prevDefinition))
{
prevDefinition?.Terminate();
definitions.Remove(key);
}
myMap.Remove(key);
}
}
else
{
ReceiveTrace?.Log($">> CHANGE IGNORED {getMessage(msgVersioned, version, isPut, value)}");
}
if (msgVersioned)
{
proto.Wire.Send(RdId, innerWriter =>
{
innerWriter.WriteInt32((1 << versionedFlagShift) | Ack);
innerWriter.WriteInt64(version);
WriteKeyDelegate.Invoke(ctx, innerWriter, key);
SendTrace?.Log($"{this} :: ACK :: key = {key.PrintToString()} :: version = {version}");
});
if (IsMaster)
ourLogReceived.Error("Both ends are masters: {0}", Location);
}
});
}
}