in src/CRA.ClientLibrary/Main/CRAWorker.cs [644:762]
private async Task EgressToStream(
string fromVertexName,
string fromVertexOutput,
string toVertexName,
string toVertexInput,
bool reverse,
Stream ns,
CancellationTokenSource source,
string address = null,
int port = -1,
bool sharding = true)
{
try
{
string key = fromVertexOutput;
int shardId = -1;
//#if SHARDING
if (sharding)
{
key = GetShardedVertexName(fromVertexOutput);
shardId = GetShardedVertexShardId(fromVertexOutput);
if (shardId >= 0)
{
var skey = GetShardedVertexName(fromVertexName) + ":" + key + ":" + GetShardedVertexName(toVertexName);
while (true)
{
if (shardingInfoTable.ContainsKey(skey))
{
var si = shardingInfoTable[skey];
if (si.AllShards.Contains(GetShardedVertexShardId(toVertexName)))
break;
var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(toVertexName));
if (shardingInfoTable.TryUpdate(skey, newSI, si))
{
((IAsyncShardedVertexOutputEndpoint)_localVertexTable[fromVertexName].AsyncOutputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(toVertexName), newSI);
break;
}
}
else
{
var newSI = await _craClient.GetShardingInfoAsync(GetShardedVertexName(toVertexName));
if (shardingInfoTable.TryAdd(skey, newSI))
{
((IAsyncShardedVertexOutputEndpoint)_localVertexTable[fromVertexName].AsyncOutputEndpoints[key]).UpdateShardingInfo(GetShardedVertexName(toVertexName), newSI);
break;
}
}
}
}
}
//#endif
if (_localVertexTable[fromVertexName].OutputEndpoints.ContainsKey(key))
{
if (shardId < 0)
await
Task.Run(() =>
_localVertexTable[fromVertexName].OutputEndpoints[fromVertexOutput]
.ToStream(ns, toVertexName, toVertexInput, source.Token), source.Token);
else
throw new NotImplementedException();
}
else if (_localVertexTable[fromVertexName].AsyncOutputEndpoints.ContainsKey(key))
{
if (shardId < 0)
await _localVertexTable[fromVertexName].AsyncOutputEndpoints[fromVertexOutput].ToStreamAsync(ns, toVertexName, toVertexInput, source.Token);
else
await ((IAsyncShardedVertexOutputEndpoint)_localVertexTable[fromVertexName].AsyncOutputEndpoints[key])
.ToStreamAsync(ns, GetShardedVertexName(toVertexName),
shardId, GetShardedVertexName(toVertexInput), source.Token);
}
else
{
Trace.TraceInformation("Unable to find output endpoint (on from side)");
return;
}
CancellationTokenSource oldSource;
if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
{
oldSource.Dispose();
if (address != null && port != -1)
{
// Add/Return a sender stream connection to the pool
if (!_craClient.TryAddSenderStreamToPool(address, port.ToString(), (NetworkStream)ns))
{
ns.Dispose();
}
}
else
{
// Keep a receiver stream connection to be used later
#pragma warning disable CS4014
Task.Run(() => TryReuseReceiverStream(ns));
#pragma warning restore CS4014
}
await _craClient.DisconnectAsync(fromVertexName, fromVertexOutput, toVertexName, toVertexInput);
}
}
catch (Exception e)
{
Trace.TraceInformation("Exception (" + e.ToString() + ") in outgoing stream - reconnecting");
CancellationTokenSource oldSource;
if (outConnections.TryRemove(fromVertexName + ":" + fromVertexOutput + ":" + toVertexName + ":" + toVertexInput, out oldSource))
{
oldSource.Dispose();
}
else
{
Trace.TraceError("Unexpected: caught exception in ToStream but entry absent in outConnections");
}
// Retry following while connection not in list
await RetryRestoreConnection(fromVertexName, fromVertexOutput, toVertexName, toVertexInput, false);
}
}