in src/common/DevHostAgent/DevHostAgentExecutorClient.cs [85:157]
public async Task ReversePortForwardStartAsync(PortForwardStartInfo port, Func<int, byte[], Task> dataHandler, Action<int> closedHandler, CancellationToken cancellationToken)
{
var connection = await this._GetConnectionAsync(cancellationToken);
var channelReader = await connection.StreamAsChannelAsync<PortForwardStreamBlock>("RunReversePortForward", port, cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
PortForwardStreamBlock b = null;
try
{
b = await channelReader.ReadAsync(cancellationToken);
}
catch (Exception ex) when (ex is OperationCanceledException)
{
// Cancellation requested
break;
}
switch (b.Flag)
{
case PortForwardStreamFlag.Connected:
_reversePortForwardStartCancellationTokenRegistrations.Add(cancellationToken.Register(async () =>
{
try
{
await connection.InvokeAsync("StopReversePortForward", port.Port, b.StreamId);
}
catch (TaskCanceledException)
{
// Task Canceled
}
catch (Exception ex)
{
// This is needed because this run inside a task that is forgotten, so any exception that is not processed inside the task is going to get leaked outside the normal processing scope.
// We have a bug to change this pattern here https://devdiv.visualstudio.com/DevDiv/_boards/board/t/Mindaro/Stories/?workitem=1178734
_log.Exception(ex);
}
}));
break;
case PortForwardStreamFlag.Data:
try
{
await dataHandler(b.StreamId, b.Content);
}
catch (Exception dex)
{
// closes this connection
try
{
await connection.InvokeAsync("StopReversePortForward", port.Port, b.StreamId);
}
catch (Exception ex)
{
_log.Exception(ex);
}
_log.Exception(dex);
closedHandler(b.StreamId);
}
break;
case PortForwardStreamFlag.Closed:
closedHandler(b.StreamId);
break;
default:
throw new InvalidOperationException("Invalid protocol - expect flag 2 (data) or 3 (closed).");
}
}
return;
}