src/Shared/NodeEndpointOutOfProcBase.cs (466 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//-----------------------------------------------------------------------
// </copyright>
// <summary>Base class for the implementation of a node endpoint for out-of-proc nodes.</summary>
//-----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Diagnostics;
using Microsoft.Build.Framework;
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
using System.Security;
#if FEATURE_SECURITY_PERMISSIONS || FEATURE_PIPE_SECURITY
using System.Security.AccessControl;
#endif
using System.Security.Principal;
#if !FEATURE_APM
using System.Threading.Tasks;
#endif
#if FEATURE_SECURITY_PERMISSIONS
using System.Security.Permissions;
#endif
namespace Microsoft.Build.BackEnd
{
/// <summary>
/// This is an implementation of INodeEndpoint for the out-of-proc nodes. It acts only as a client.
/// </summary>
internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint
{
#region Private Data
/// <summary>
/// The amount of time to wait for the client to connect to the host.
/// </summary>
private const int ClientConnectTimeout = 60000;
/// <summary>
/// The size of the buffers to use for named pipes
/// </summary>
private const int PipeBufferSize = 131072;
/// <summary>
/// Flag indicating if we should debug communications or not.
/// </summary>
private bool _debugCommunications = false;
/// <summary>
/// The current communication status of the node.
/// </summary>
private LinkStatus _status;
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
/// <summary>
/// The pipe client used by the nodes.
/// </summary>
private NamedPipeServerStream _pipeServer;
#else
private AnonymousPipeClientStream _pipeClientToServer;
private AnonymousPipeClientStream _pipeServerToClient;
#endif
// The following private data fields are used only when the endpoint is in ASYNCHRONOUS mode.
/// <summary>
/// Object used as a lock source for the async data
/// </summary>
private object _asyncDataMonitor;
/// <summary>
/// Set when a packet is available in the packet queue
/// </summary>
private AutoResetEvent _packetAvailable;
/// <summary>
/// Set when the asynchronous packet pump should terminate
/// </summary>
private AutoResetEvent _terminatePacketPump;
/// <summary>
/// The thread which runs the asynchronous packet pump
/// </summary>
private Thread _packetPump;
/// <summary>
/// The factory used to create and route packets.
/// </summary>
private INodePacketFactory _packetFactory;
/// <summary>
/// The asynchronous packet queue.
/// </summary>
/// <remarks>
/// Operations on this queue must be synchronized since it is accessible by multiple threads.
/// Use a lock on the packetQueue itself.
/// </remarks>
private Queue<INodePacket> _packetQueue;
/// <summary>
/// Per-node shared read buffer.
/// </summary>
private SharedReadBuffer _sharedReadBuffer;
#endregion
#region INodeEndpoint Events
/// <summary>
/// Raised when the link status has changed.
/// </summary>
public event LinkStatusChangedDelegate OnLinkStatusChanged;
#endregion
#region INodeEndpoint Properties
/// <summary>
/// Returns the link status of this node.
/// </summary>
public LinkStatus LinkStatus
{
get { return _status; }
}
#endregion
#region Properties
#endregion
#region INodeEndpoint Methods
/// <summary>
/// Causes this endpoint to wait for the remote endpoint to connect
/// </summary>
/// <param name="factory">The factory used to create packets.</param>
public void Listen(INodePacketFactory factory)
{
ErrorUtilities.VerifyThrow(_status == LinkStatus.Inactive, "Link not inactive. Status is {0}", _status);
ErrorUtilities.VerifyThrowArgumentNull(factory, "factory");
_packetFactory = factory;
InitializeAsyncPacketThread();
}
/// <summary>
/// Causes this node to connect to the matched endpoint.
/// </summary>
/// <param name="factory">The factory used to create packets.</param>
public void Connect(INodePacketFactory factory)
{
ErrorUtilities.ThrowInternalError("Connect() not valid on the out of proc endpoint.");
}
/// <summary>
/// Shuts down the link
/// </summary>
public void Disconnect()
{
InternalDisconnect();
}
/// <summary>
/// Sends data to the peer endpoint.
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
// PERF: Set up a priority system so logging packets are sent only when all other packet types have been sent.
if (_status == LinkStatus.Active)
{
EnqueuePacket(packet);
}
}
#endregion
#region Construction
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
/// <summary>
/// Instantiates an endpoint to act as a client
/// </summary>
/// <param name="pipeName">The name of the pipe to which we should connect.</param>
internal void InternalConstruct(string pipeName)
{
ErrorUtilities.VerifyThrowArgumentLength(pipeName, "pipeName");
_debugCommunications = (Environment.GetEnvironmentVariable("MSBUILDDEBUGCOMM") == "1");
_status = LinkStatus.Inactive;
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
#if FEATURE_PIPE_SECURITY
SecurityIdentifier identifier = WindowsIdentity.GetCurrent().Owner;
PipeSecurity security = new PipeSecurity();
// Restrict access to just this account. We set the owner specifically here, and on the
// pipe client side they will check the owner against this one - they must have identical
// SIDs or the client will reject this server. This is used to avoid attacks where a
// hacked server creates a less restricted pipe in an attempt to lure us into using it and
// then sending build requests to the real pipe client (which is the MSBuild Build Manager.)
PipeAccessRule rule = new PipeAccessRule(identifier, PipeAccessRights.ReadWrite, AccessControlType.Allow);
security.AddAccessRule(rule);
security.SetOwner(identifier);
#endif
_pipeServer = new NamedPipeServerStream
(
pipeName,
PipeDirection.InOut,
1, // Only allow one connection at a time.
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous | PipeOptions.WriteThrough,
PipeBufferSize, // Default input buffer
PipeBufferSize // Default output buffer
#if FEATURE_PIPE_SECURITY
, security,
HandleInheritability.None
#endif
);
}
#else
internal void InternalConstruct(string clientToServerPipeHandle, string serverToClientPipeHandle)
{
ErrorUtilities.VerifyThrowArgumentLength(clientToServerPipeHandle, "clientToServerPipeHandle");
ErrorUtilities.VerifyThrowArgumentLength(serverToClientPipeHandle, "serverToClientPipeHandle");
_debugCommunications = (Environment.GetEnvironmentVariable("MSBUILDDEBUGCOMM") == "1");
_status = LinkStatus.Inactive;
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
_pipeClientToServer = new AnonymousPipeClientStream(PipeDirection.Out, clientToServerPipeHandle);
_pipeServerToClient = new AnonymousPipeClientStream(PipeDirection.In, serverToClientPipeHandle);
}
#endif
#endregion
/// <summary>
/// Returns the host handshake for this node endpoint
/// </summary>
protected abstract long GetHostHandshake();
/// <summary>
/// Returns the client handshake for this node endpoint
/// </summary>
protected abstract long GetClientHandshake();
/// <summary>
/// Updates the current link status if it has changed and notifies any registered delegates.
/// </summary>
/// <param name="newStatus">The status the node should now be in.</param>
protected void ChangeLinkStatus(LinkStatus newStatus)
{
ErrorUtilities.VerifyThrow(_status != newStatus, "Attempting to change status to existing status {0}.", _status);
CommunicationsUtilities.Trace("Changing link status from {0} to {1}", _status.ToString(), newStatus.ToString());
_status = newStatus;
RaiseLinkStatusChanged(_status);
}
/// <summary>
/// Invokes the OnLinkStatusChanged event in a thread-safe manner.
/// </summary>
/// <param name="newStatus">The new status of the endpoint link.</param>
private void RaiseLinkStatusChanged(LinkStatus newStatus)
{
if (null != OnLinkStatusChanged)
{
LinkStatusChangedDelegate linkStatusDelegate = OnLinkStatusChanged;
linkStatusDelegate(this, newStatus);
}
}
#region Private Methods
/// <summary>
/// This does the actual work of changing the status and shutting down any threads we may have for
/// disconnection.
/// </summary>
private void InternalDisconnect()
{
ErrorUtilities.VerifyThrow(_packetPump.ManagedThreadId != Thread.CurrentThread.ManagedThreadId, "Can't join on the same thread.");
_terminatePacketPump.Set();
_packetPump.Join();
#if CLR2COMPATIBILITY
_terminatePacketPump.Close();
#else
_terminatePacketPump.Dispose();
#endif
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
_pipeServer.Dispose();
#else
_pipeClientToServer.Dispose();
_pipeServerToClient.Dispose();
#endif
_packetPump = null;
ChangeLinkStatus(LinkStatus.Inactive);
}
#region Asynchronous Mode Methods
/// <summary>
/// Adds a packet to the packet queue when asynchronous mode is enabled.
/// </summary>
/// <param name="packet">The packet to be transmitted.</param>
private void EnqueuePacket(INodePacket packet)
{
ErrorUtilities.VerifyThrowArgumentNull(packet, "packet");
ErrorUtilities.VerifyThrow(null != _packetQueue, "packetQueue is null");
ErrorUtilities.VerifyThrow(null != _packetAvailable, "packetAvailable is null");
lock (_packetQueue)
{
_packetQueue.Enqueue(packet);
_packetAvailable.Set();
}
}
/// <summary>
/// Initializes the packet pump thread and the supporting events as well as the packet queue.
/// </summary>
private void InitializeAsyncPacketThread()
{
lock (_asyncDataMonitor)
{
_packetPump = new Thread(PacketPumpProc);
_packetPump.IsBackground = true;
_packetPump.Name = "OutOfProc Endpoint Packet Pump";
_packetAvailable = new AutoResetEvent(false);
_terminatePacketPump = new AutoResetEvent(false);
_packetQueue = new Queue<INodePacket>();
_packetPump.Start();
}
}
/// <summary>
/// This method handles the asynchronous message pump. It waits for messages to show up on the queue
/// and calls FireDataAvailable for each such packet. It will terminate when the terminate event is
/// set.
/// </summary>
private void PacketPumpProc()
{
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
NamedPipeServerStream localPipeServer = _pipeServer;
PipeStream localWritePipe = _pipeServer;
PipeStream localReadPipe = _pipeServer;
#else
PipeStream localWritePipe = _pipeClientToServer;
PipeStream localReadPipe = _pipeServerToClient;
#endif
AutoResetEvent localPacketAvailable = _packetAvailable;
AutoResetEvent localTerminatePacketPump = _terminatePacketPump;
Queue<INodePacket> localPacketQueue = _packetQueue;
DateTime originalWaitStartTime = DateTime.UtcNow;
bool gotValidConnection = false;
while (!gotValidConnection)
{
DateTime restartWaitTime = DateTime.UtcNow;
// We only wait to wait the difference between now and the last original start time, in case we have multiple hosts attempting
// to attach. This prevents each attempt from resetting the timer.
TimeSpan usedWaitTime = restartWaitTime - originalWaitStartTime;
int waitTimeRemaining = Math.Max(0, CommunicationsUtilities.NodeConnectionTimeout - (int)usedWaitTime.TotalMilliseconds);
try
{
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
// Wait for a connection
#if FEATURE_APM
IAsyncResult resultForConnection = localPipeServer.BeginWaitForConnection(null, null);
#else
Task connectionTask = localPipeServer.WaitForConnectionAsync();
#endif
CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining);
#if FEATURE_APM
bool connected = resultForConnection.AsyncWaitHandle.WaitOne(waitTimeRemaining, false);
#else
bool connected = connectionTask.Wait(waitTimeRemaining);
#endif
if (!connected)
{
CommunicationsUtilities.Trace("Connection timed out waiting a host to contact us. Exiting comm thread.");
ChangeLinkStatus(LinkStatus.ConnectionFailed);
return;
}
CommunicationsUtilities.Trace("Parent started connecting. Reading handshake from parent");
#if FEATURE_APM
localPipeServer.EndWaitForConnection(resultForConnection);
#endif
#endif
// The handshake protocol is a simple long exchange. The host sends us a long, and we
// respond with another long. Once the handshake is complete, both sides can be assured the
// other is ready to accept data.
// To avoid mixing client and server builds, the long is the MSBuild binary timestamp.
// Compatibility issue here.
// Previous builds of MSBuild 4.0 would exchange just a byte.
// Host would send either 0x5F or 0x60 depending on whether it was the toolset or not respectively.
// Client would return either 0xF5 or 0x06 respectively.
// Therefore an old host on a machine with new clients running will hang,
// sending a byte and waiting for a byte until it eventually times out;
// because the new client will want 7 more bytes before it returns anything.
// The other way around is not a problem, because the old client would immediately return the (wrong)
// byte on receiving the first byte of the long sent by the new host, and the new host would disconnect.
// To avoid the hang, special case here:
// Make sure our handshakes always start with 00.
// If we received ONLY one byte AND it's 0x5F or 0x60, return 0xFF (it doesn't matter what as long as
// it will cause the host to reject us; new hosts expect 00 and old hosts expect F5 or 06).
try
{
long handshake = localReadPipe.ReadLongForHandshake(/* reject these leads */ new byte[] { 0x5F, 0x60 }, 0xFF /* this will disconnect the host; it expects leading 00 or F5 or 06 */);
#if FEATURE_SECURITY_PERMISSIONS
WindowsIdentity currentIdentity = WindowsIdentity.GetCurrent();
string remoteUserName = localPipeServer.GetImpersonationUserName();
#endif
if (handshake != GetHostHandshake())
{
CommunicationsUtilities.Trace("Handshake failed. Received {0} from host not {1}. Probably the host is a different MSBuild build.", handshake, GetHostHandshake());
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
localPipeServer.Disconnect();
#else
localWritePipe.Dispose();
localReadPipe.Dispose();
#endif
continue;
}
#if FEATURE_SECURITY_PERMISSIONS
// We will only talk to a host that was started by the same user as us. Even though the pipe access is set to only allow this user, we want to ensure they
// haven't attempted to change those permissions out from under us. This ensures that the only way they can truly gain access is to be impersonating the
// user we were started by.
WindowsIdentity clientIdentity = null;
localPipeServer.RunAsClient(delegate () { clientIdentity = WindowsIdentity.GetCurrent(true); });
if (clientIdentity == null || !String.Equals(clientIdentity.Name, currentIdentity.Name, StringComparison.OrdinalIgnoreCase))
{
CommunicationsUtilities.Trace("Handshake failed. Host user is {0} but we were created by {1}.", (clientIdentity == null) ? "<unknown>" : clientIdentity.Name, currentIdentity.Name);
localPipeServer.Disconnect();
continue;
}
#endif
}
catch (IOException
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
e
#endif
)
{
// We will get here when:
// 1. The host (OOP main node) connects to us, it immediately checks for user privileges
// and if they don't match it disconnects immediately leaving us still trying to read the blank handshake
// 2. The host is too old sending us bits we automatically reject in the handshake
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
CommunicationsUtilities.Trace("Client connection failed but we will wait for another connection. Exception: {0}", e.Message);
if (localPipeServer.IsConnected)
{
localPipeServer.Disconnect();
}
continue;
#else
throw;
#endif
}
gotValidConnection = true;
}
catch (Exception e)
{
if (ExceptionHandling.IsCriticalException(e))
{
throw;
}
CommunicationsUtilities.Trace("Client connection failed. Exiting comm thread. {0}", e);
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
if (localPipeServer.IsConnected)
{
localPipeServer.Disconnect();
}
#else
localWritePipe.Dispose();
localReadPipe.Dispose();
#endif
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
return;
}
}
CommunicationsUtilities.Trace("Writing handshake to parent");
localWritePipe.WriteLongForHandshake(GetClientHandshake());
ChangeLinkStatus(LinkStatus.Active);
RunReadLoop(
new BufferedReadStream(localReadPipe),
localWritePipe,
localPacketQueue, localPacketAvailable, localTerminatePacketPump);
CommunicationsUtilities.Trace("Ending read loop");
try
{
#if FEATURE_NAMED_PIPES_FULL_DUPLEX
if (localPipeServer.IsConnected)
{
localPipeServer.WaitForPipeDrain();
localPipeServer.Disconnect();
}
#else
localReadPipe.Dispose();
localWritePipe.WaitForPipeDrain();
localWritePipe.Dispose();
#endif
}
catch (Exception)
{
// We don't really care if Disconnect somehow fails, but it gives us a chance to do the right thing.
}
}
private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
Queue<INodePacket> localPacketQueue, AutoResetEvent localPacketAvailable, AutoResetEvent localTerminatePacketPump)
{
// Ordering of the wait handles is important. The first signalled wait handle in the array
// will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
// terminate event triggered so that we cannot get into a situation where packets are being
// spammed to the endpoint and it never gets an opportunity to shutdown.
CommunicationsUtilities.Trace("Entering read loop.");
byte[] headerByte = new byte[5];
#if FEATURE_APM
IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
Task<int> readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif
bool exitLoop = false;
do
{
// Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all
// packets to be sent by other threads which are shutting down, such as the logging thread.
WaitHandle[] handles = new WaitHandle[] {
#if FEATURE_APM
result.AsyncWaitHandle,
#else
((IAsyncResult)readTask).AsyncWaitHandle,
#endif
localPacketAvailable, localTerminatePacketPump };
int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
{
case 0:
{
int bytesRead = 0;
try
{
#if FEATURE_APM
bytesRead = localReadPipe.EndRead(result);
#else
bytesRead = readTask.Result;
#endif
}
catch (Exception e)
{
// Lost communications. Abort (but allow node reuse)
CommunicationsUtilities.Trace("Exception reading from server. {0}", e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Inactive);
exitLoop = true;
break;
}
if (bytesRead != headerByte.Length)
{
// Incomplete read. Abort.
if (bytesRead == 0)
{
CommunicationsUtilities.Trace("Parent disconnected abruptly");
}
else
{
CommunicationsUtilities.Trace("Incomplete header read from server. {0} of {1} bytes read", bytesRead, headerByte.Length);
}
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
int packetLength = BitConverter.ToInt32(headerByte, 1);
try
{
_packetFactory.DeserializeAndRoutePacket(0, packetType, NodePacketTranslator.GetReadTranslator(localReadPipe, _sharedReadBuffer));
}
catch (Exception e)
{
// Error while deserializing or handling packet. Abort.
CommunicationsUtilities.Trace("Exception while deserializing packet {0}: {1}", packetType, e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
#if FEATURE_APM
result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif
}
break;
case 1:
case 2:
try
{
int packetCount = localPacketQueue.Count;
// Write out all the queued packets.
while (packetCount > 0)
{
INodePacket packet;
lock (_packetQueue)
{
packet = localPacketQueue.Dequeue();
}
MemoryStream packetStream = new MemoryStream();
INodePacketTranslator writeTranslator = NodePacketTranslator.GetWriteTranslator(packetStream);
packetStream.WriteByte((byte)packet.Type);
// Pad for packet length
packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);
// Reset the position in the write buffer.
packet.Translate(writeTranslator);
// Now write in the actual packet length
packetStream.Position = 1;
packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);
#if FEATURE_MEMORYSTREAM_GETBUFFER
localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
#else
ArraySegment<byte> packetStreamBuffer;
if (packetStream.TryGetBuffer(out packetStreamBuffer))
{
localWritePipe.Write(packetStreamBuffer.Array, packetStreamBuffer.Offset, packetStreamBuffer.Count);
}
else
{
localWritePipe.Write(packetStream.ToArray(), 0, (int)packetStream.Length);
}
#endif
packetCount--;
}
}
catch (Exception e)
{
// Error while deserializing or handling packet. Abort.
CommunicationsUtilities.Trace("Exception while serializing packets: {0}", e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
if (waitId == 2)
{
CommunicationsUtilities.Trace("Disconnecting voluntarily");
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
}
break;
default:
ErrorUtilities.ThrowInternalError("waitId {0} out of range.", waitId);
break;
}
}
while (!exitLoop);
}
#endregion
#endregion
}
}