in src/CRA.ClientLibrary/DataProcessing/ShardedOperators/ShardedProducerOperator.cs [44:109]
internal override void InitializeOperator(int shardId, ShardingInfo shardingInfo)
{
_hasSplittedOutput = HasSplittedOutput();
string[] toEndpoints = GetEndpointNamesForVertex(VertexName.Split('$')[0], _toFromConnections);
string[] fromEndpoints = GetEndpointNamesForVertex(VertexName.Split('$')[0], _fromToConnections);
int secondaryOutputsCount = 0;
int ordinaryOutputSCount = 0;
foreach (var fromEndpoint in fromEndpoints)
{
var toTuple = _fromToConnections[new Tuple<string, string>(VertexName.Split('$')[0], fromEndpoint)];
if (toTuple.Item4)
secondaryOutputsCount++;
else
ordinaryOutputSCount++;
}
int deployProduceInputCount = secondaryOutputsCount;
if (_hasSplittedOutput)
deployProduceInputCount += shardingInfo.AllShards.Length;
else
deployProduceInputCount += ordinaryOutputSCount;
_deployProduceInput = new CountdownEvent(deployProduceInputCount);
_runProduceInput = new CountdownEvent(deployProduceInputCount);
int secondaryInputsCount = 0;
foreach (var toEndpoint in toEndpoints)
{
var fromTuple = _toFromConnections[new Tuple<string, string>(VertexName.Split('$')[0], toEndpoint)];
if (fromTuple.Item4) secondaryInputsCount++;
}
_deployProduceOutput = new CountdownEvent(secondaryInputsCount);
_runProduceOutput = new CountdownEvent(secondaryInputsCount);
_continueAfterTransformation = new CountdownEvent(1);
if (secondaryInputsCount > 0) _hasSecondaryInput = true;
foreach (var toEndpoint in toEndpoints)
{
var fromTuple = _toFromConnections[new Tuple<string, string>(VertexName.Split('$')[0], toEndpoint)];
if (!fromTuple.Item4)
throw new NotImplementedException("Shared input endpoints are not supported in produce operators!!");
else
{
_startCreatingSecondaryDatasets[fromTuple.Item1] = new CountdownEvent(1);
_finishCreatingSecondaryDatasets[fromTuple.Item1] = new CountdownEvent(1);
AddAsyncInputEndpoint(toEndpoint, new ShardedProducerSecondaryInput(this, shardId, shardingInfo.AllShards.Length, toEndpoint));
}
}
if (!_hasSecondaryInput)
{
CreateAndTransformDataset(shardId);
_isTransformationsApplied = true;
_continueAfterTransformation.Signal();
}
foreach (var fromEndpoint in fromEndpoints)
{
var toTuple = _fromToConnections[new Tuple<string, string>(VertexName.Split('$')[0], fromEndpoint)];
if (!toTuple.Item4)
AddAsyncOutputEndpoint(fromEndpoint, new ShardedProducerOutput(this, shardId, shardingInfo.AllShards.Length, fromEndpoint));
else
AddAsyncOutputEndpoint(fromEndpoint, new ShardedProducerSecondaryOutput(this, shardId, shardingInfo.AllShards.Length, fromEndpoint));
}
}