in src/Shared/NodeEndpointOutOfProcBase.cs [541:708]
private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
Queue<INodePacket> localPacketQueue, AutoResetEvent localPacketAvailable, AutoResetEvent localTerminatePacketPump)
{
// Ordering of the wait handles is important. The first signalled wait handle in the array
// will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
// terminate event triggered so that we cannot get into a situation where packets are being
// spammed to the endpoint and it never gets an opportunity to shutdown.
CommunicationsUtilities.Trace("Entering read loop.");
byte[] headerByte = new byte[5];
#if FEATURE_APM
IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
Task<int> readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif
bool exitLoop = false;
do
{
// Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all
// packets to be sent by other threads which are shutting down, such as the logging thread.
WaitHandle[] handles = new WaitHandle[] {
#if FEATURE_APM
result.AsyncWaitHandle,
#else
((IAsyncResult)readTask).AsyncWaitHandle,
#endif
localPacketAvailable, localTerminatePacketPump };
int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
{
case 0:
{
int bytesRead = 0;
try
{
#if FEATURE_APM
bytesRead = localReadPipe.EndRead(result);
#else
bytesRead = readTask.Result;
#endif
}
catch (Exception e)
{
// Lost communications. Abort (but allow node reuse)
CommunicationsUtilities.Trace("Exception reading from server. {0}", e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Inactive);
exitLoop = true;
break;
}
if (bytesRead != headerByte.Length)
{
// Incomplete read. Abort.
if (bytesRead == 0)
{
CommunicationsUtilities.Trace("Parent disconnected abruptly");
}
else
{
CommunicationsUtilities.Trace("Incomplete header read from server. {0} of {1} bytes read", bytesRead, headerByte.Length);
}
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
int packetLength = BitConverter.ToInt32(headerByte, 1);
try
{
_packetFactory.DeserializeAndRoutePacket(0, packetType, NodePacketTranslator.GetReadTranslator(localReadPipe, _sharedReadBuffer));
}
catch (Exception e)
{
// Error while deserializing or handling packet. Abort.
CommunicationsUtilities.Trace("Exception while deserializing packet {0}: {1}", packetType, e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
#if FEATURE_APM
result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif
}
break;
case 1:
case 2:
try
{
int packetCount = localPacketQueue.Count;
// Write out all the queued packets.
while (packetCount > 0)
{
INodePacket packet;
lock (_packetQueue)
{
packet = localPacketQueue.Dequeue();
}
MemoryStream packetStream = new MemoryStream();
INodePacketTranslator writeTranslator = NodePacketTranslator.GetWriteTranslator(packetStream);
packetStream.WriteByte((byte)packet.Type);
// Pad for packet length
packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);
// Reset the position in the write buffer.
packet.Translate(writeTranslator);
// Now write in the actual packet length
packetStream.Position = 1;
packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);
#if FEATURE_MEMORYSTREAM_GETBUFFER
localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
#else
ArraySegment<byte> packetStreamBuffer;
if (packetStream.TryGetBuffer(out packetStreamBuffer))
{
localWritePipe.Write(packetStreamBuffer.Array, packetStreamBuffer.Offset, packetStreamBuffer.Count);
}
else
{
localWritePipe.Write(packetStream.ToArray(), 0, (int)packetStream.Length);
}
#endif
packetCount--;
}
}
catch (Exception e)
{
// Error while deserializing or handling packet. Abort.
CommunicationsUtilities.Trace("Exception while serializing packets: {0}", e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
if (waitId == 2)
{
CommunicationsUtilities.Trace("Disconnecting voluntarily");
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
}
break;
default:
ErrorUtilities.ThrowInternalError("waitId {0} out of range.", waitId);
break;
}
}
while (!exitLoop);
}