in src/Microsoft.Azure.Relay/HybridConnectionListener.cs [841:889]
async Task<bool> ReceivePumpCoreAsync()
{
bool keepGoing = true;
CancellationToken shutdownToken = this.shutdownCancellationSource.Token;
Task<WebSocket> connectTask = this.EnsureConnectTask(shutdownToken);
try
{
WebSocket webSocket = await connectTask.ConfigureAwait(false);
int totalBytesRead = 0;
do
{
var currentReadBuffer = new ArraySegment<byte>(this.receiveBuffer.Array, this.receiveBuffer.Offset + totalBytesRead, this.receiveBuffer.Count - totalBytesRead);
WebSocketReceiveResult receiveResult = await webSocket.ReceiveAsync(currentReadBuffer, CancellationToken.None).ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
await this.CloseOrAbortWebSocketAsync(
connectTask, false, receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, shutdownToken).ConfigureAwait(false);
if (this.listener.closeCalled)
{
// This is the cloud service responding to our clean shutdown.
keepGoing = false;
}
else
{
keepGoing = this.OnDisconnect(new ConnectionLostException(receiveResult.CloseStatus.Value + ": " + receiveResult.CloseStatusDescription));
}
break;
}
totalBytesRead += receiveResult.Count;
if (receiveResult.EndOfMessage)
{
var commandBuffer = new ArraySegment<byte>(this.receiveBuffer.Array, this.receiveBuffer.Offset, totalBytesRead);
await this.listener.OnCommandAsync(commandBuffer, webSocket).ConfigureAwait(false);
totalBytesRead = 0;
}
}
while (!shutdownToken.IsCancellationRequested);
}
catch (Exception exception) when (!Fx.IsFatal(exception))
{
RelayEventSource.Log.HandledExceptionAsWarning(this.listener, exception);
await this.CloseOrAbortWebSocketAsync(connectTask, abort: true).ConfigureAwait(false);
keepGoing = this.OnDisconnect(WebSocketExceptionHelper.ConvertToRelayContract(exception, this.listener.TrackingContext));
}
return keepGoing;
}