in src/CRA.ClientLibrary/Main/CRAWorker.cs [278:434]
internal async Task<CRAErrorCode> Connect_InitiatorSide(
string fromVertexName,
string fromVertexOutput,
string toVertexName,
string toVertexInput,
bool reverse,
bool sharding = true,
bool killIfExists = true,
bool killRemote = true)
{
VertexInfo row;
try
{
// Need to get the latest address & port
row = (await (reverse
? _vertexInfoProvider.GetRowForActiveVertex(fromVertexName)
: _vertexInfoProvider.GetRowForActiveVertex(toVertexName))).Value;
}
catch
{
return CRAErrorCode.ActiveVertexNotFound;
}
// If from and to vertices are on the same (this) instance,
// we can convert a "reverse" connection into a normal connection
if (reverse && (row.InstanceName == InstanceName))
reverse = false;
CancellationTokenSource oldSource;
var conn = reverse ? inConnections : outConnections;
if (conn.TryGetValue(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput,
out oldSource))
{
if (killIfExists)
{
Trace.TraceInformation("Deleting prior connection - it will automatically reconnect");
oldSource.Cancel();
}
return CRAErrorCode.Success;
}
if (TryFusedConnect(row.InstanceName, fromVertexName, fromVertexOutput, toVertexName, toVertexInput))
{
return CRAErrorCode.Success;
}
// Re-check the connection table as someone may have successfully
// created a fused connection
if (conn.TryGetValue(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput,
out oldSource))
{
if (killIfExists)
{
Trace.TraceInformation("Deleting prior connection - it will automatically reconnect");
oldSource.Cancel();
}
return CRAErrorCode.Success;
}
// Send request to CRA instance
Stream ns = null;
VertexInfo _row = default;
try
{
_row = (await _vertexInfoProvider.GetRowForInstanceVertex(row.InstanceName, "")).Value;
// Get a stream connection from the pool if available
if (!_craClient.TryGetSenderStreamFromPool(_row.Address, _row.Port.ToString(), out ns))
{
var client = new TcpClient();
client.NoDelay = true;
await client.ConnectAsync(_row.Address, _row.Port, _tcpConnectTimeoutMs);
ns = _craClient.SecureStreamConnectionDescriptor
.CreateSecureClient(client.GetStream(), row.InstanceName);
}
}
catch
{ return CRAErrorCode.ConnectionEstablishFailed; }
if (!reverse)
ns.WriteInt32((int)CRATaskMessageType.CONNECT_VERTEX_RECEIVER);
else
ns.WriteInt32((int)CRATaskMessageType.CONNECT_VERTEX_RECEIVER_REVERSE);
ns.WriteByteArray(Encoding.UTF8.GetBytes(fromVertexName));
ns.WriteByteArray(Encoding.UTF8.GetBytes(fromVertexOutput));
ns.WriteByteArray(Encoding.UTF8.GetBytes(toVertexName));
ns.WriteByteArray(Encoding.UTF8.GetBytes(toVertexInput));
ns.WriteInt32(killRemote ? 1 : 0);
CRAErrorCode result = (CRAErrorCode)ns.ReadInt32();
if (result != 0)
{
Trace.TraceError("Error occurred while establishing the connection!!");
return result;
}
else
{
CancellationTokenSource source = new CancellationTokenSource();
if (!reverse)
{
if (outConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
{
var tmp = Task.Run(() =>
EgressToStream(
fromVertexName,
fromVertexOutput,
toVertexName,
toVertexInput,
reverse,
ns,
source,
_row.Address,
_row.Port,
sharding));
return CRAErrorCode.Success;
}
else
{
source.Dispose();
ns.Close();
Trace.TraceInformation("Race adding connection - deleting outgoing stream");
return CRAErrorCode.ConnectionAdditionRace;
}
}
else
{
if (inConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
{
var tmp = Task.Run(() => IngressFromStream(
fromVertexName,
fromVertexOutput,
toVertexName,
toVertexInput,
reverse,
ns,
source,
_row.Address,
_row.Port,
sharding));
return CRAErrorCode.Success;
}
else
{
source.Dispose();
ns.Close();
Trace.TraceInformation("Race adding connection - deleting outgoing stream");
return CRAErrorCode.ConnectionAdditionRace;
}
}
}
}