private async Task EgressToStream()

in src/CRA.ClientLibrary/Main/CRAWorker.cs [644:762]


        private async Task EgressToStream(
            string fromVertexName,
            string fromVertexOutput,
            string toVertexName,
            string toVertexInput,
            bool reverse,
            Stream ns,
            CancellationTokenSource source,
            string address = null,
            int port = -1,
            bool sharding = true)
        {
            try
            {
                string key = fromVertexOutput;
                int shardId = -1;
//#if SHARDING
                if (sharding)
                {
                    key = GetShardedVertexName(fromVertexOutput);
                    shardId = GetShardedVertexShardId(fromVertexOutput);

                    if (shardId >= 0)
                    {
                        var skey = GetShardedVertexName(fromVertexName) + ":" + key + ":" + GetShardedVertexName(toVertexName);
                        while (true)
                        {
                            if (shardingInfoTable.ContainsKey(skey))
                            {
                                var si = shardingInfoTable[skey];
                                if (si.AllShards.Contains(GetShardedVertexShardId(toVertexName)))
                                    break;
                                var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(toVertexName));
                                if (shardingInfoTable.TryUpdate(skey, newSI, si))
                                {
                                    ((IAsyncShardedVertexOutputEndpoint)_localVertexTable[fromVertexName].AsyncOutputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(toVertexName), newSI);
                                    break;
                                }
                            }
                            else
                            {
                                var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(toVertexName));
                                if (shardingInfoTable.TryAdd(skey, newSI))
                                {
                                    ((IAsyncShardedVertexOutputEndpoint)_localVertexTable[fromVertexName].AsyncOutputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(toVertexName), newSI);
                                    break;
                                }
                            }
                        }
                    }
                }
//#endif
                if (_localVertexTable[fromVertexName].OutputEndpoints.ContainsKey(key))
                {
                    if (shardId < 0)
                        await
                            Task.Run(() =>
                                _localVertexTable[fromVertexName].OutputEndpoints[fromVertexOutput]
                                    .ToStream(ns, toVertexName, toVertexInput, source.Token), source.Token);
                    else
                        throw new NotImplementedException();

                }
                else if (_localVertexTable[fromVertexName].AsyncOutputEndpoints.ContainsKey(key))
                {
                    if (shardId < 0)
                        await _localVertexTable[fromVertexName].AsyncOutputEndpoints[fromVertexOutput].ToStreamAsync(ns, toVertexName, toVertexInput, source.Token);
                    else
                        await ((IAsyncShardedVertexOutputEndpoint)_localVertexTable[fromVertexName].AsyncOutputEndpoints[key])
                            .ToStreamAsync(ns, GetShardedVertexName(toVertexName),
                            shardId, GetShardedVertexName(toVertexInput), source.Token);
                }
                else
                {
                    Trace.TraceInformation("Unable to find output endpoint (on from side)");
                    return;
                }

                CancellationTokenSource oldSource;
                if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
                {
                    oldSource.Dispose();

                    if (address != null && port != -1)
                    {
                        // Add/Return a sender stream connection to the pool
                        if (!_craClient.TryAddSenderStreamToPool(address, port.ToString(), (NetworkStream)ns))
                        {
                            ns.Dispose();
                        }
                    }
                    else
                    {
                        // Keep a receiver stream connection to be used later
#pragma warning disable CS4014
                        Task.Run(() => TryReuseReceiverStream(ns));
#pragma warning restore CS4014
                    }

                    await _craClient.DisconnectAsync(fromVertexName, fromVertexOutput, toVertexName, toVertexInput);
                }
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Exception (" + e.ToString() + ") in outgoing stream - reconnecting");
                CancellationTokenSource oldSource;
                if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
                {
                    oldSource.Dispose();
                }
                else
                {
                    Trace.TraceError("Unexpected: caught exception in ToStream but entry absent in outConnections");
                }

                // Retry following while connection not in list
                await RetryRestoreConnection(fromVertexName, fromVertexOutput, toVertexName, toVertexInput, false);
            }
        }