in rd-net/RdFramework/Impl/SocketWire.cs [236:306]
private int ReceiveFromPkgBuffer(byte[] buffer, int offset, int size)
{
//size > 0
if (myPkg.Available > 0)
{
var sizeToCopy = Math.Min(size, myPkg.Available);
myPkg.MoveTo(buffer, offset, sizeToCopy);
return sizeToCopy;
}
else
{
while (true)
{
myPkgHeaderBuffer.Clear();
if (!myPkgHeaderBuffer.Read(ref mySocketBuffer, ReceiveFromSocket))
return 0;
Int32 len = UnsafeReader.ReadInt32FromBytes(myPkgHeaderBuffer.Data);
if (len == PING_LEN)
{
Int32 receivedTimestamp = UnsafeReader.ReadInt32FromBytes(myPkgHeaderBuffer.Data, sizeof(Int32));
Int32 receivedCounterpartTimestamp = UnsafeReader.ReadInt32FromBytes(myPkgHeaderBuffer.Data, sizeof(Int32) + sizeof(Int32));
myCounterpartTimestamp = receivedTimestamp;
myCounterpartNotionTimestamp = receivedCounterpartTimestamp;
if (ConnectionEstablished(myCurrentTimeStamp, myCounterpartNotionTimestamp))
{
if (!HeartbeatAlive.Value) // only on change
{
Log.WhenTrace()?.Log($"Connection is alive after receiving PING {Id}: " +
$"receivedTimestamp: {receivedTimestamp}, " +
$"receivedCounterpartTimestamp: {receivedCounterpartTimestamp}, " +
$"currentTimeStamp: {myCurrentTimeStamp}, " +
$"counterpartTimestamp: {myCounterpartTimestamp}, " +
$"counterpartNotionTimestamp: {myCounterpartNotionTimestamp}");
}
HeartbeatAlive.Value = true;
}
continue;
}
Int64 seqN = UnsafeReader.ReadInt64FromBytes(myPkgHeaderBuffer.Data, sizeof(Int32));
if (len == ACK_MSG_LEN)
{
SendBuffer.Acknowledge(seqN);
}
else
{
myPkg.Clear();
if (!myPkg.Read(ref mySocketBuffer, ReceiveFromSocket, len))
return 0;
if (seqN > myMaxReceivedSeqn || seqN == 1 /*TODO new client, possible duplicate problem if ack for seqN=1 from previous client's connection hasn't passed*/)
{
myMaxReceivedSeqn = seqN; //will be acknowledged when we read whole message
Assertion.Assert(myPkg.Available > 0);
var sizeToCopy = Math.Min(size, myPkg.Available);
myPkg.MoveTo(buffer, offset, sizeToCopy);
return sizeToCopy;
}
else
myAcktor.SendAsync(seqN);
}
}
}
}