internal async Task Connect_InitiatorSide()

in src/CRA.ClientLibrary/Main/CRAWorker.cs [278:434]


        internal async Task<CRAErrorCode> Connect_InitiatorSide(
            string fromVertexName,
            string fromVertexOutput,
            string toVertexName,
            string toVertexInput,
            bool reverse,
            bool sharding = true,
            bool killIfExists = true,
            bool killRemote = true)
        {
            VertexInfo row;

            try
            {
                // Need to get the latest address & port
                row = (await (reverse
                    ? _vertexInfoProvider.GetRowForActiveVertex(fromVertexName)
                    : _vertexInfoProvider.GetRowForActiveVertex(toVertexName))).Value;
            }
            catch
            {
                return CRAErrorCode.ActiveVertexNotFound;
            }

            // If from and to vertices are on the same (this) instance,
            // we can convert a "reverse" connection into a normal connection
            if (reverse && (row.InstanceName == InstanceName))
                reverse = false;

            CancellationTokenSource oldSource;
            var conn = reverse ? inConnections : outConnections;
            if (conn.TryGetValue(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput,
                out oldSource))
            {
                if (killIfExists)
                {
                    Trace.TraceInformation("Deleting prior connection - it will automatically reconnect");
                    oldSource.Cancel();
                }
                return CRAErrorCode.Success;
            }

            if (TryFusedConnect(row.InstanceName, fromVertexName, fromVertexOutput, toVertexName, toVertexInput))
            {
                return CRAErrorCode.Success;
            }

            // Re-check the connection table as someone may have successfully
            // created a fused connection
            if (conn.TryGetValue(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput,
                out oldSource))
            {
                if (killIfExists)
                {
                    Trace.TraceInformation("Deleting prior connection - it will automatically reconnect");
                    oldSource.Cancel();
                }
                return CRAErrorCode.Success;
            }

            // Send request to CRA instance
            Stream ns = null;
            VertexInfo _row = default;

            try
            {
                _row = (await _vertexInfoProvider.GetRowForInstanceVertex(row.InstanceName, "")).Value;

                // Get a stream connection from the pool if available
                if (!_craClient.TryGetSenderStreamFromPool(_row.Address, _row.Port.ToString(), out ns))
                {
                    var client = new TcpClient();
                    client.NoDelay = true;
                    await client.ConnectAsync(_row.Address, _row.Port, _tcpConnectTimeoutMs);

                    ns = _craClient.SecureStreamConnectionDescriptor
                          .CreateSecureClient(client.GetStream(), row.InstanceName);
                }
            }
            catch
            { return CRAErrorCode.ConnectionEstablishFailed; }

            if (!reverse)
                ns.WriteInt32((int)CRATaskMessageType.CONNECT_VERTEX_RECEIVER);
            else
                ns.WriteInt32((int)CRATaskMessageType.CONNECT_VERTEX_RECEIVER_REVERSE);

            ns.WriteByteArray(Encoding.UTF8.GetBytes(fromVertexName));
            ns.WriteByteArray(Encoding.UTF8.GetBytes(fromVertexOutput));
            ns.WriteByteArray(Encoding.UTF8.GetBytes(toVertexName));
            ns.WriteByteArray(Encoding.UTF8.GetBytes(toVertexInput));
            ns.WriteInt32(killRemote ? 1 : 0);
            CRAErrorCode result = (CRAErrorCode)ns.ReadInt32();

            if (result != 0)
            {
                Trace.TraceError("Error occurred while establishing the connection!!");
                return result;
            }
            else
            {
                CancellationTokenSource source = new CancellationTokenSource();

                if (!reverse)
                {
                    if (outConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
                    {
                        var tmp = Task.Run(() =>
                            EgressToStream(
                                fromVertexName,
                                fromVertexOutput,
                                toVertexName,
                                toVertexInput,
                                reverse,
                                ns,
                                source,
                                _row.Address,
                                _row.Port, 
                                sharding));
                        return CRAErrorCode.Success;
                    }
                    else
                    {
                        source.Dispose();
                        ns.Close();
                        Trace.TraceInformation("Race adding connection - deleting outgoing stream");
                        return CRAErrorCode.ConnectionAdditionRace;
                    }
                }
                else
                {
                    if (inConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
                    {
                        var tmp = Task.Run(() => IngressFromStream(
                            fromVertexName,
                            fromVertexOutput,
                            toVertexName,
                            toVertexInput,
                            reverse,
                            ns,
                            source,
                            _row.Address,
                            _row.Port,
                            sharding));

                        return CRAErrorCode.Success;
                    }
                    else
                    {
                        source.Dispose();
                        ns.Close();
                        Trace.TraceInformation("Race adding connection - deleting outgoing stream");
                        return CRAErrorCode.ConnectionAdditionRace;
                    }
                }
            }
        }