private async Task IngressFromStream()

in src/CRA.ClientLibrary/Main/CRAWorker.cs [875:989]


        private async Task IngressFromStream(
            string fromVertexName,
            string fromVertexOutput,
            string toVertexName,
            string toVertexInput,
            bool reverse,
            Stream ns,
            CancellationTokenSource source,
            string address = null,
            int port = -1,
            bool sharding = true)
        {
            try
            {
                string key = toVertexInput;
                int shardId = -1;
//#if SHARDING
                if (sharding)
                {
                    key = GetShardedVertexName(toVertexInput);
                    shardId = GetShardedVertexShardId(toVertexInput);

                    if (shardId >= 0)
                    {
                        var skey = GetShardedVertexName(toVertexName) + ":" + key + ":" + GetShardedVertexName(fromVertexName);
                        while (true)
                        {
                            if (shardingInfoTable.ContainsKey(skey))
                            {
                                var si = shardingInfoTable[skey];
                                if (si.AllShards.Contains(GetShardedVertexShardId(fromVertexName)))
                                    break;
                                var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(fromVertexName));
                                if (shardingInfoTable.TryUpdate(skey, newSI, si))
                                {
                                    ((IAsyncShardedVertexInputEndpoint)_localVertexTable[toVertexName].AsyncInputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(fromVertexName), newSI);
                                    break;
                                }
                            }
                            else
                            {
                                var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(fromVertexName));
                                if (shardingInfoTable.TryAdd(skey, newSI))
                                {
                                    ((IAsyncShardedVertexInputEndpoint)_localVertexTable[toVertexName].AsyncInputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(fromVertexName), newSI);
                                    break;
                                }
                            }
                        }
                    }
                }
//#endif
                if (_localVertexTable[toVertexName].InputEndpoints.ContainsKey(key))
                {
                    if (shardId < 0)
                        await Task.Run(
                            () => _localVertexTable[toVertexName].InputEndpoints[toVertexInput]
                            .FromStream(ns, fromVertexName, fromVertexOutput, source.Token), source.Token);
                    else
                        throw new NotImplementedException();
                }
                else if (_localVertexTable[toVertexName].AsyncInputEndpoints.ContainsKey(key))
                {
                    if (shardId < 0)
                        await _localVertexTable[toVertexName].AsyncInputEndpoints[toVertexInput].FromStreamAsync(ns, fromVertexName, fromVertexOutput, source.Token);
                    else
                        await ((IAsyncShardedVertexInputEndpoint)_localVertexTable[toVertexName].AsyncInputEndpoints[key])
                            .FromStreamAsync(ns, GetShardedVertexName(fromVertexName),
                            shardId, GetShardedVertexName(fromVertexOutput), source.Token);
                }
                else
                {
                    Trace.TraceError("Unable to find input endpoint (on to side)");
                    return;
                }

                // Completed FromStream successfully
                CancellationTokenSource oldSource;
                if (inConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
                {
                    oldSource.Dispose();

                    if (address != null && port != -1)
                    {
                        // Add/Return a sender stream connection to the pool
                        if (!_craClient.TryAddSenderStreamToPool(address, port.ToString(), (NetworkStream)ns))
                        {
                            ns.Dispose();
                        }
                    }
                    else
                    {
                        // Keep a receiver stream connection to be used later
#pragma warning disable CS4014
                        Task.Run(() => TryReuseReceiverStream(ns));
#pragma warning restore CS4014
                    }
                }
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Exception (" + e.ToString() + ") in incoming stream - reconnecting");
                CancellationTokenSource tokenSource;
                if (inConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out tokenSource))
                {
                    tokenSource.Dispose();
                }
                else
                {
                    Trace.TraceError("Unexpected: caught exception in FromStream but entry absent in inConnections");
                }

                await RetryRestoreConnection(fromVertexName, fromVertexOutput, toVertexName, toVertexInput, true);
            }
        }