in src/CRA.ClientLibrary/Main/CRAWorker.cs [580:642]
private void ConnectVertex_Receiver(object streamObject, bool reverse = false)
{
var stream = (Stream)streamObject;
string fromVertexName = Encoding.UTF8.GetString(stream.ReadByteArray());
string fromVertexOutput = Encoding.UTF8.GetString(stream.ReadByteArray());
string toVertexName = Encoding.UTF8.GetString(stream.ReadByteArray());
string toVertexInput = Encoding.UTF8.GetString(stream.ReadByteArray());
bool killIfExists = stream.ReadInt32() == 1 ? true : false;
if (!reverse)
{
if (!_localVertexTable.ContainsKey(toVertexName))
{
stream.WriteInt32((int)CRAErrorCode.VertexNotFound);
Task.Run(() => TryReuseReceiverStream(stream));
return;
}
var key = GetShardedVertexName(toVertexInput);
if (!_localVertexTable[toVertexName].InputEndpoints.ContainsKey(key) &&
!_localVertexTable[toVertexName].AsyncInputEndpoints.ContainsKey(key)
)
{
stream.WriteInt32((int)CRAErrorCode.VertexInputNotFound);
Task.Run(() => TryReuseReceiverStream(stream));
return;
}
}
else
{
if (!_localVertexTable.ContainsKey(fromVertexName))
{
stream.WriteInt32((int)CRAErrorCode.VertexNotFound);
Task.Run(() => TryReuseReceiverStream(stream));
return;
}
if (!_localVertexTable[fromVertexName].OutputEndpoints.ContainsKey(fromVertexOutput) &&
!_localVertexTable[fromVertexName].AsyncOutputEndpoints.ContainsKey(fromVertexOutput)
)
{
stream.WriteInt32((int)CRAErrorCode.VertexInputNotFound);
Task.Run(() => TryReuseReceiverStream(stream));
return;
}
}
int result = Connect_ReceiverSide(
fromVertexName,
fromVertexOutput,
toVertexName,
toVertexInput,
stream,
reverse,
killIfExists);
// Do not close and dispose stream because it is being reused for data
if (result != 0)
{
Task.Run(() => TryReuseReceiverStream(stream));
}
}