private void RunReadLoop()

in src/Shared/NodeEndpointOutOfProcBase.cs [541:708]


        private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
            Queue<INodePacket> localPacketQueue, AutoResetEvent localPacketAvailable, AutoResetEvent localTerminatePacketPump)
        {
            // Ordering of the wait handles is important.  The first signalled wait handle in the array 
            // will be returned by WaitAny if multiple wait handles are signalled.  We prefer to have the
            // terminate event triggered so that we cannot get into a situation where packets are being
            // spammed to the endpoint and it never gets an opportunity to shutdown.
            CommunicationsUtilities.Trace("Entering read loop.");
            byte[] headerByte = new byte[5];
#if FEATURE_APM
            IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
            Task<int> readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif

            bool exitLoop = false;
            do
            {
                // Ordering is important.  We want packetAvailable to supercede terminate otherwise we will not properly wait for all
                // packets to be sent by other threads which are shutting down, such as the logging thread.
                WaitHandle[] handles = new WaitHandle[] {
#if FEATURE_APM
                    result.AsyncWaitHandle,
#else
                    ((IAsyncResult)readTask).AsyncWaitHandle,
#endif
                    localPacketAvailable, localTerminatePacketPump };

                int waitId = WaitHandle.WaitAny(handles);
                switch (waitId)
                {
                    case 0:
                        {
                            int bytesRead = 0;
                            try
                            {
#if FEATURE_APM
                                bytesRead = localReadPipe.EndRead(result);
#else
                                bytesRead = readTask.Result;
#endif
                            }
                            catch (Exception e)
                            {
                                // Lost communications.  Abort (but allow node reuse)
                                CommunicationsUtilities.Trace("Exception reading from server.  {0}", e);
                                ExceptionHandling.DumpExceptionToFile(e);
                                ChangeLinkStatus(LinkStatus.Inactive);
                                exitLoop = true;
                                break;
                            }

                            if (bytesRead != headerByte.Length)
                            {
                                // Incomplete read.  Abort.
                                if (bytesRead == 0)
                                {
                                    CommunicationsUtilities.Trace("Parent disconnected abruptly");
                                }
                                else
                                {
                                    CommunicationsUtilities.Trace("Incomplete header read from server.  {0} of {1} bytes read", bytesRead, headerByte.Length);
                                }

                                ChangeLinkStatus(LinkStatus.Failed);
                                exitLoop = true;
                                break;
                            }

                            NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
                            int packetLength = BitConverter.ToInt32(headerByte, 1);

                            try
                            {
                                _packetFactory.DeserializeAndRoutePacket(0, packetType, NodePacketTranslator.GetReadTranslator(localReadPipe, _sharedReadBuffer));
                            }
                            catch (Exception e)
                            {
                                // Error while deserializing or handling packet.  Abort.
                                CommunicationsUtilities.Trace("Exception while deserializing packet {0}: {1}", packetType, e);
                                ExceptionHandling.DumpExceptionToFile(e);
                                ChangeLinkStatus(LinkStatus.Failed);
                                exitLoop = true;
                                break;
                            }

#if FEATURE_APM
                            result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
                            readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif
                        }

                        break;

                    case 1:
                    case 2:
                        try
                        {
                            int packetCount = localPacketQueue.Count;

                            // Write out all the queued packets.
                            while (packetCount > 0)
                            {
                                INodePacket packet;
                                lock (_packetQueue)
                                {
                                    packet = localPacketQueue.Dequeue();
                                }

                                MemoryStream packetStream = new MemoryStream();
                                INodePacketTranslator writeTranslator = NodePacketTranslator.GetWriteTranslator(packetStream);

                                packetStream.WriteByte((byte)packet.Type);

                                // Pad for packet length
                                packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);

                                // Reset the position in the write buffer.
                                packet.Translate(writeTranslator);

                                // Now write in the actual packet length
                                packetStream.Position = 1;
                                packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);

#if FEATURE_MEMORYSTREAM_GETBUFFER
                                localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
#else
                                ArraySegment<byte> packetStreamBuffer;
                                if (packetStream.TryGetBuffer(out packetStreamBuffer))
                                {
                                    localWritePipe.Write(packetStreamBuffer.Array, packetStreamBuffer.Offset, packetStreamBuffer.Count);
                                }
                                else
                                {
                                    localWritePipe.Write(packetStream.ToArray(), 0, (int)packetStream.Length);
                                }
#endif

                                packetCount--;
                            }
                        }
                        catch (Exception e)
                        {
                            // Error while deserializing or handling packet.  Abort.
                            CommunicationsUtilities.Trace("Exception while serializing packets: {0}", e);
                            ExceptionHandling.DumpExceptionToFile(e);
                            ChangeLinkStatus(LinkStatus.Failed);
                            exitLoop = true;
                            break;
                        }

                        if (waitId == 2)
                        {
                            CommunicationsUtilities.Trace("Disconnecting voluntarily");
                            ChangeLinkStatus(LinkStatus.Failed);
                            exitLoop = true;
                        }

                        break;

                    default:
                        ErrorUtilities.ThrowInternalError("waitId {0} out of range.", waitId);
                        break;
                }
            }
            while (!exitLoop);
        }