private int Connect_ReceiverSide()

in src/CRA.ClientLibrary/Main/CRAWorker.cs [436:517]


        private int Connect_ReceiverSide(
            string fromVertexName,
            string fromVertexOutput,
            string toVertexName,
            string toVertexInput,
            Stream stream,
            bool reverse,
            bool killIfExists = true)
        {
            CancellationTokenSource oldSource;
            var conn = reverse ? outConnections : inConnections;
            if (conn.TryGetValue(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
            {
                if (killIfExists)
                {
                    Trace.TraceInformation("Deleting prior connection - it will automatically reconnect");
                    oldSource.Cancel();
                }
                else
                {
                    Trace.TraceInformation("There exists prior connection - not killing");
                }
                stream.WriteInt32((int)CRAErrorCode.ServerRecovering);
                return (int)CRAErrorCode.ServerRecovering;
            }
            else
            {
                stream.WriteInt32(0);
            }

            CancellationTokenSource source = new CancellationTokenSource();

            if (!reverse)
            {
                if (inConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
                {
                    Task.Run(() =>
                        IngressFromStream(
                            fromVertexName,
                            fromVertexOutput,
                            toVertexName,
                            toVertexInput,
                            reverse,
                            stream,
                            source));

                    return (int)CRAErrorCode.Success;
                }
                else
                {
                    source.Dispose();
                    stream.Close();
                    Trace.TraceInformation("Race adding connection - deleting incoming stream");
                    return (int)CRAErrorCode.ConnectionAdditionRace;
                }
            }
            else
            {
                if (outConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
                {
                    Task.Run(() =>
                        EgressToStream(
                            fromVertexName,
                            fromVertexOutput,
                            toVertexName,
                            toVertexInput,
                            reverse,
                            stream,
                            source));

                    return (int)CRAErrorCode.Success;
                }
                else
                {
                    source.Dispose();
                    stream.Close();
                    Trace.TraceInformation("Race adding connection - deleting incoming stream");
                    return (int)CRAErrorCode.ConnectionAdditionRace;
                }
            }

        }