in src/CRA.ClientLibrary/Main/CRAWorker.cs [436:517]
private int Connect_ReceiverSide(
string fromVertexName,
string fromVertexOutput,
string toVertexName,
string toVertexInput,
Stream stream,
bool reverse,
bool killIfExists = true)
{
CancellationTokenSource oldSource;
var conn = reverse ? outConnections : inConnections;
if (conn.TryGetValue(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
{
if (killIfExists)
{
Trace.TraceInformation("Deleting prior connection - it will automatically reconnect");
oldSource.Cancel();
}
else
{
Trace.TraceInformation("There exists prior connection - not killing");
}
stream.WriteInt32((int)CRAErrorCode.ServerRecovering);
return (int)CRAErrorCode.ServerRecovering;
}
else
{
stream.WriteInt32(0);
}
CancellationTokenSource source = new CancellationTokenSource();
if (!reverse)
{
if (inConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
{
Task.Run(() =>
IngressFromStream(
fromVertexName,
fromVertexOutput,
toVertexName,
toVertexInput,
reverse,
stream,
source));
return (int)CRAErrorCode.Success;
}
else
{
source.Dispose();
stream.Close();
Trace.TraceInformation("Race adding connection - deleting incoming stream");
return (int)CRAErrorCode.ConnectionAdditionRace;
}
}
else
{
if (outConnections.TryAdd(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, source))
{
Task.Run(() =>
EgressToStream(
fromVertexName,
fromVertexOutput,
toVertexName,
toVertexInput,
reverse,
stream,
source));
return (int)CRAErrorCode.Success;
}
else
{
source.Dispose();
stream.Close();
Trace.TraceInformation("Race adding connection - deleting incoming stream");
return (int)CRAErrorCode.ConnectionAdditionRace;
}
}
}