private async Task EgressToVertexInput()

in src/CRA.ClientLibrary/Main/CRAWorker.cs [764:829]


        private async Task EgressToVertexInput(string fromVertexName, string fromVertexOutput, string toVertexName, string toVertexInput,
            CancellationTokenSource source)
        {
            try
            {
                if (_localVertexTable[fromVertexName].OutputEndpoints.ContainsKey(fromVertexOutput))
                {
                    var fromVertex = _localVertexTable[fromVertexName].OutputEndpoints[fromVertexOutput] as IFusableVertexOutputEndpoint;
                    var toVertex = _localVertexTable[toVertexName].InputEndpoints[toVertexInput] as IVertexInputEndpoint;

                    if (fromVertex != null && toVertex != null && fromVertex.CanFuseWith(toVertex, toVertexName, toVertexInput))
                    {
                        await
                            Task.Run(() => fromVertex.ToInput(toVertex, toVertexName, toVertexInput, source.Token));
                    }
                    else
                    {
                        Trace.TraceError("Unable to create fused connection");
                        return;
                    }
                }
                else if (_localVertexTable[fromVertexName].AsyncOutputEndpoints.ContainsKey(fromVertexOutput))
                {
                    var fromVertex = _localVertexTable[fromVertexName].AsyncOutputEndpoints[fromVertexOutput] as IAsyncFusableVertexOutputEndpoint;
                    var toVertex = _localVertexTable[toVertexName].AsyncInputEndpoints[toVertexInput] as IAsyncVertexInputEndpoint;

                    if (fromVertex != null && toVertex != null && fromVertex.CanFuseWith(toVertex, toVertexName, toVertexInput))
                    {
                        await fromVertex.ToInputAsync(toVertex, toVertexName, toVertexInput, source.Token);
                    }
                    else
                    {
                        Trace.TraceError("Unable to create fused connection");
                        return;
                    }
                }
                else
                {
                    Trace.TraceError("Unable to create fused connection");
                    return;
                }

                CancellationTokenSource oldSource;
                if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
                {
                    oldSource.Dispose();
                    await _craClient.DisconnectAsync(fromVertexName, fromVertexOutput, toVertexName, toVertexInput);
                }
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Exception (" + e.ToString() + ") in outgoing stream - reconnecting");
                CancellationTokenSource oldSource;
                if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
                {
                    oldSource.Dispose();
                }
                else
                {
                    Trace.TraceError("Unexpected: caught exception in ToStream but entry absent in outConnections");
                }

                // Retry following while connection not in list
                await RetryRestoreConnection(fromVertexName, fromVertexOutput, toVertexName, toVertexInput, false);
            }
        }