in src/CRA.ClientLibrary/Main/CRAWorker.cs [875:989]
private async Task IngressFromStream(
string fromVertexName,
string fromVertexOutput,
string toVertexName,
string toVertexInput,
bool reverse,
Stream ns,
CancellationTokenSource source,
string address = null,
int port = -1,
bool sharding = true)
{
try
{
string key = toVertexInput;
int shardId = -1;
//#if SHARDING
if (sharding)
{
key = GetShardedVertexName(toVertexInput);
shardId = GetShardedVertexShardId(toVertexInput);
if (shardId >= 0)
{
var skey = GetShardedVertexName(toVertexName) + ":" + key + ":" + GetShardedVertexName(fromVertexName);
while (true)
{
if (shardingInfoTable.ContainsKey(skey))
{
var si = shardingInfoTable[skey];
if (si.AllShards.Contains(GetShardedVertexShardId(fromVertexName)))
break;
var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(fromVertexName));
if (shardingInfoTable.TryUpdate(skey, newSI, si))
{
((IAsyncShardedVertexInputEndpoint)_localVertexTable[toVertexName].AsyncInputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(fromVertexName), newSI);
break;
}
}
else
{
var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(fromVertexName));
if (shardingInfoTable.TryAdd(skey, newSI))
{
((IAsyncShardedVertexInputEndpoint)_localVertexTable[toVertexName].AsyncInputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(fromVertexName), newSI);
break;
}
}
}
}
}
//#endif
if (_localVertexTable[toVertexName].InputEndpoints.ContainsKey(key))
{
if (shardId < 0)
await Task.Run(
() => _localVertexTable[toVertexName].InputEndpoints[toVertexInput]
.FromStream(ns, fromVertexName, fromVertexOutput, source.Token), source.Token);
else
throw new NotImplementedException();
}
else if (_localVertexTable[toVertexName].AsyncInputEndpoints.ContainsKey(key))
{
if (shardId < 0)
await _localVertexTable[toVertexName].AsyncInputEndpoints[toVertexInput].FromStreamAsync(ns, fromVertexName, fromVertexOutput, source.Token);
else
await ((IAsyncShardedVertexInputEndpoint)_localVertexTable[toVertexName].AsyncInputEndpoints[key])
.FromStreamAsync(ns, GetShardedVertexName(fromVertexName),
shardId, GetShardedVertexName(fromVertexOutput), source.Token);
}
else
{
Trace.TraceError("Unable to find input endpoint (on to side)");
return;
}
// Completed FromStream successfully
CancellationTokenSource oldSource;
if (inConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
{
oldSource.Dispose();
if (address != null && port != -1)
{
// Add/Return a sender stream connection to the pool
if (!_craClient.TryAddSenderStreamToPool(address, port.ToString(), (NetworkStream)ns))
{
ns.Dispose();
}
}
else
{
// Keep a receiver stream connection to be used later
#pragma warning disable CS4014
Task.Run(() => TryReuseReceiverStream(ns));
#pragma warning restore CS4014
}
}
}
catch (Exception e)
{
Trace.TraceInformation("Exception (" + e.ToString() + ") in incoming stream - reconnecting");
CancellationTokenSource tokenSource;
if (inConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out tokenSource))
{
tokenSource.Dispose();
}
else
{
Trace.TraceError("Unexpected: caught exception in FromStream but entry absent in inConnections");
}
await RetryRestoreConnection(fromVertexName, fromVertexOutput, toVertexName, toVertexInput, true);
}
}