async Task ReceivePumpCoreAsync()

in src/Microsoft.Azure.Relay/HybridConnectionListener.cs [841:889]


            async Task<bool> ReceivePumpCoreAsync()
            {
                bool keepGoing = true;
                CancellationToken shutdownToken = this.shutdownCancellationSource.Token;
                Task<WebSocket> connectTask = this.EnsureConnectTask(shutdownToken);
                try
                {
                    WebSocket webSocket = await connectTask.ConfigureAwait(false);
                    int totalBytesRead = 0;
                    do
                    {
                        var currentReadBuffer = new ArraySegment<byte>(this.receiveBuffer.Array, this.receiveBuffer.Offset + totalBytesRead, this.receiveBuffer.Count - totalBytesRead);
                        WebSocketReceiveResult receiveResult = await webSocket.ReceiveAsync(currentReadBuffer, CancellationToken.None).ConfigureAwait(false);
                        if (receiveResult.MessageType == WebSocketMessageType.Close)
                        {
                            await this.CloseOrAbortWebSocketAsync(
                                connectTask, false, receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, shutdownToken).ConfigureAwait(false);
                            if (this.listener.closeCalled)
                            {
                                // This is the cloud service responding to our clean shutdown.
                                keepGoing = false;
                            }
                            else
                            {
                                keepGoing = this.OnDisconnect(new ConnectionLostException(receiveResult.CloseStatus.Value + ": " + receiveResult.CloseStatusDescription));
                            }

                            break;
                        }

                        totalBytesRead += receiveResult.Count;
                        if (receiveResult.EndOfMessage)
                        {
                            var commandBuffer = new ArraySegment<byte>(this.receiveBuffer.Array, this.receiveBuffer.Offset, totalBytesRead);
                            await this.listener.OnCommandAsync(commandBuffer, webSocket).ConfigureAwait(false);
                            totalBytesRead = 0;
                        }
                    }
                    while (!shutdownToken.IsCancellationRequested);
                }
                catch (Exception exception) when (!Fx.IsFatal(exception))
                {
                    RelayEventSource.Log.HandledExceptionAsWarning(this.listener, exception);
                    await this.CloseOrAbortWebSocketAsync(connectTask, abort: true).ConfigureAwait(false);
                    keepGoing = this.OnDisconnect(WebSocketExceptionHelper.ConvertToRelayContract(exception, this.listener.TrackingContext));
                }

                return keepGoing;
            }