in src/CRA.ClientLibrary/Main/CRAWorker.cs [764:829]
private async Task EgressToVertexInput(string fromVertexName, string fromVertexOutput, string toVertexName, string toVertexInput,
CancellationTokenSource source)
{
try
{
if (_localVertexTable[fromVertexName].OutputEndpoints.ContainsKey(fromVertexOutput))
{
var fromVertex = _localVertexTable[fromVertexName].OutputEndpoints[fromVertexOutput] as IFusableVertexOutputEndpoint;
var toVertex = _localVertexTable[toVertexName].InputEndpoints[toVertexInput] as IVertexInputEndpoint;
if (fromVertex != null && toVertex != null && fromVertex.CanFuseWith(toVertex, toVertexName, toVertexInput))
{
await
Task.Run(() => fromVertex.ToInput(toVertex, toVertexName, toVertexInput, source.Token));
}
else
{
Trace.TraceError("Unable to create fused connection");
return;
}
}
else if (_localVertexTable[fromVertexName].AsyncOutputEndpoints.ContainsKey(fromVertexOutput))
{
var fromVertex = _localVertexTable[fromVertexName].AsyncOutputEndpoints[fromVertexOutput] as IAsyncFusableVertexOutputEndpoint;
var toVertex = _localVertexTable[toVertexName].AsyncInputEndpoints[toVertexInput] as IAsyncVertexInputEndpoint;
if (fromVertex != null && toVertex != null && fromVertex.CanFuseWith(toVertex, toVertexName, toVertexInput))
{
await fromVertex.ToInputAsync(toVertex, toVertexName, toVertexInput, source.Token);
}
else
{
Trace.TraceError("Unable to create fused connection");
return;
}
}
else
{
Trace.TraceError("Unable to create fused connection");
return;
}
CancellationTokenSource oldSource;
if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
{
oldSource.Dispose();
await _craClient.DisconnectAsync(fromVertexName, fromVertexOutput, toVertexName, toVertexInput);
}
}
catch (Exception e)
{
Trace.TraceInformation("Exception (" + e.ToString() + ") in outgoing stream - reconnecting");
CancellationTokenSource oldSource;
if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
{
oldSource.Dispose();
}
else
{
Trace.TraceError("Unexpected: caught exception in ToStream but entry absent in outConnections");
}
// Retry following while connection not in list
await RetryRestoreConnection(fromVertexName, fromVertexOutput, toVertexName, toVertexInput, false);
}
}