public override void OnWireReceived()

in rd-net/RdFramework/Impl/RdMap.cs [203:299]


    public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader stream, IRdWireableDispatchHelper dispatchHelper)
    {
      var header = stream.ReadInt();
      var msgVersioned = (header >> versionedFlagShift) != 0;
      var opType = header & ((1 << versionedFlagShift) - 1);

      var version = msgVersioned ? stream.ReadLong() : 0L;

      var key = ReadKeyDelegate(ctx, stream);

      var lifetime = dispatchHelper.Lifetime;
      if (opType == Ack)
      {
        dispatchHelper.Dispatch(() =>
        {
          lock (myPendingForAck)
          {
            string? error = null;
            if (!msgVersioned) error = "Received ACK while msg hasn't versioned flag set";
            else if (!IsMaster) error = "Received ACK when not a Master";
            else if (!myPendingForAck.TryGetValue(key, out var pendingVersion)) error = "No pending for ACK";
            else if (pendingVersion < version)
              error = $"Pending version `{pendingVersion}` < ACK version `{version}`";
            // Good scenario
            else if (pendingVersion == version)
              myPendingForAck.Remove(key);
            //else do nothing, silently drop
            var isError = !string.IsNullOrEmpty(error);
            if (ourLogReceived.IsTraceEnabled() || isError)
            {
              ourLogReceived.LogFormat(isError ? LoggingLevel.ERROR : LoggingLevel.TRACE,
                "{0}  :: ACK :: key = {1} :: version = {2}{3}", this, key.PrintToString(), version,
                isError ? " >> " + error : "");
            }
          }
        });
      }
      else
      {
        var kind = (AddUpdateRemove)opType;
        var isPut = kind is AddUpdateRemove.Add or AddUpdateRemove.Update;
        var value = isPut ? ReadValueDelegate(ctx, stream) : default;
        var definition = TryPreBindValue(lifetime, key, value, true);

              ReceiveTrace?.Log($"OnWireReceived:: {getMessage(msgVersioned, version, isPut, value)}");

dispatchHelper.Dispatch(() =>
        {
          if (msgVersioned || !IsMaster || !IsPendingForAck(key))
          {
            ReceiveTrace?.Log($"Dispatched:: {getMessage(msgVersioned, version, isPut, value)}");

            if (isPut)
            {
              if (TryGetBindDefinitions(lifetime) is { } definitions)
              {
                if (kind == AddUpdateRemove.Update)
                  definitions[key]?.Terminate();

                definitions[key] = definition;
              }

              myMap[key] = value!;
}
            else
            {
              if (TryGetBindDefinitions(lifetime) is { } definitions && definitions.TryGetValue(key, out var prevDefinition))
              {
                prevDefinition?.Terminate();
                definitions.Remove(key);
}

              myMap.Remove(key);
            }
          }
          else
          {
            ReceiveTrace?.Log($">> CHANGE IGNORED {getMessage(msgVersioned, version, isPut, value)}");
          }

          if (msgVersioned)
          {
            proto.Wire.Send(RdId, innerWriter =>
            {
              innerWriter.WriteInt32((1 << versionedFlagShift) | Ack);
              innerWriter.WriteInt64(version);
              WriteKeyDelegate.Invoke(ctx, innerWriter, key);

              SendTrace?.Log($"{this} :: ACK :: key = {key.PrintToString()} :: version = {version}");
            });

            if (IsMaster)
              ourLogReceived.Error("Both ends are masters: {0}", Location);
          }
        });
      }
    }