private async Task RunLoopAsync()

in src/common/PortForward/ServicePortForwardManager.cs [142:230]


            private async Task RunLoopAsync(TcpClient connection, CancellationToken cancellationToken)
            {
                await Task.Yield();

                using (var requestProcessingCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
                using (requestProcessingCancellationTokenSource.Token.Register(() => this.StopLocal(connection)))
                {
                    try
                    {
                        var stream = connection.GetStream();
                        int streamId = 0;
                        requestProcessingCancellationTokenSource.Token.ThrowIfCancellationRequested();
                        streamId = await _agentClient.ServicePortForwardStartAsync(_serviceDns, _servicePort,
                            async (data) =>
                            {
                                _log.Verbose($"ServicePortForward: received {data.Length} bytes in stream {streamId}.");
                                // data handler
                                await stream.WriteAsync(data, 0, data.Length, requestProcessingCancellationTokenSource.Token);
                            },
                            () =>
                            {
                                _log.Verbose($"ServicePortForward: stream {streamId} closed.");
                                // closed handler
                                requestProcessingCancellationTokenSource.Cancel();
                            },
                            requestProcessingCancellationTokenSource.Token);
                        _log.Verbose($"ServicePortForward: stream {streamId} connected.");

                        byte[] buffer = new byte[BUFFER_SIZE];

                        while (!requestProcessingCancellationTokenSource.Token.IsCancellationRequested)
                        {
                            int cRead = 0;
                            try
                            {
                                cRead = await stream.ReadAsync(buffer, requestProcessingCancellationTokenSource.Token);
                            }
                            catch (IOException ex)
                            {
                                if (ex.InnerException is OperationCanceledException)
                                {
                                    // Cancellation requested
                                    break;
                                }
                                if (ex.InnerException is SocketException se && (se.SocketErrorCode == SocketError.ConnectionReset || se.SocketErrorCode == SocketError.OperationAborted))
                                {
                                    _log.Verbose($"Connection is already closed by DevHostAgent on socket {streamId} (RunLoopAsync)");
                                    requestProcessingCancellationTokenSource.Cancel();
                                    break;
                                }

                                throw;
                            }

                            if (cRead == 0)
                            {
                                _log.Verbose($"ServicePortForward: stream {streamId}: StopLocal");
                                requestProcessingCancellationTokenSource.Cancel();
                                await this.StopRemoteAsync(streamId);
                                break;
                            }
                            else
                            {
                                _log.Verbose($"ServicePortForward: sending {cRead} bytes via stream {streamId}");
                                byte[] content = new byte[cRead];
                                Array.Copy(buffer, content, cRead);
                                try
                                {
                                    await _agentClient.ServicePortForwardSendAsync(_serviceDns, _servicePort, streamId, content, requestProcessingCancellationTokenSource.Token);
                                }
                                catch (Exception ex)
                                {
                                    _log.Verbose($"ServicePortForward: stream {streamId} send failed with {ex}.");
                                    await this.StopRemoteAsync(streamId);
                                }
                            }
                        }
                    }
                    // Ignore this type of exception since it arises when we try to use a socket that is closed which may happen at the time of cancellation
                    // System.IO.IOException: Unable to read data from the transport connection: The I/O operation has been aborted because of either a thread exit or an application request..
                    // ---> System.Net.Sockets.SocketException (995): The I/O operation has been aborted because of either a thread exit or an application request
                    // Bug https://devdiv.visualstudio.com/DevDiv/_workitems/edit/1172442
                    catch (IOException ex) when (ex.InnerException.GetType() == typeof(SocketException)) { _log.ExceptionAsWarning(ex); }
                    catch (Exception ex)
                    {
                        _log.Verbose($"ServicePortForwarder.RunLoopAsync exception: {ex}");
                    }
                }
            }