private void ConnectVertex_Receiver()

in src/CRA.ClientLibrary/Main/CRAWorker.cs [580:642]


        private void ConnectVertex_Receiver(object streamObject, bool reverse = false)
        {
            var stream = (Stream)streamObject;

            string fromVertexName = Encoding.UTF8.GetString(stream.ReadByteArray());
            string fromVertexOutput = Encoding.UTF8.GetString(stream.ReadByteArray());
            string toVertexName = Encoding.UTF8.GetString(stream.ReadByteArray());
            string toVertexInput = Encoding.UTF8.GetString(stream.ReadByteArray());
            bool killIfExists = stream.ReadInt32() == 1 ? true : false;

            if (!reverse)
            {
                if (!_localVertexTable.ContainsKey(toVertexName))
                {
                    stream.WriteInt32((int)CRAErrorCode.VertexNotFound);
                    Task.Run(() => TryReuseReceiverStream(stream));
                    return;
                }

                var key = GetShardedVertexName(toVertexInput);
                if (!_localVertexTable[toVertexName].InputEndpoints.ContainsKey(key) &&
                    !_localVertexTable[toVertexName].AsyncInputEndpoints.ContainsKey(key)
                    )
                {
                    stream.WriteInt32((int)CRAErrorCode.VertexInputNotFound);
                    Task.Run(() => TryReuseReceiverStream(stream));
                    return;
                }
            }
            else
            {
                if (!_localVertexTable.ContainsKey(fromVertexName))
                {
                    stream.WriteInt32((int)CRAErrorCode.VertexNotFound);
                    Task.Run(() => TryReuseReceiverStream(stream));
                    return;
                }

                if (!_localVertexTable[fromVertexName].OutputEndpoints.ContainsKey(fromVertexOutput) &&
                    !_localVertexTable[fromVertexName].AsyncOutputEndpoints.ContainsKey(fromVertexOutput)
                    )
                {
                    stream.WriteInt32((int)CRAErrorCode.VertexInputNotFound);
                    Task.Run(() => TryReuseReceiverStream(stream));
                    return;
                }
            }

            int result = Connect_ReceiverSide(
                fromVertexName,
                fromVertexOutput,
                toVertexName,
                toVertexInput,
                stream,
                reverse,
                killIfExists);

            // Do not close and dispose stream because it is being reused for data
            if (result != 0)
            {
                Task.Run(() => TryReuseReceiverStream(stream));
            }
        }