in src/CRA.ClientLibrary/DataProcessing/Definitions/Operator/OperatorsToplogy.cs [204:301]
internal void PrepareFinalOperatorsTasks()
{
string[] operatorsIds = _operatorsIds.ToArray();
TaskBase[] tasks = _operatorsTasks.ToArray();
List<string>[] tmpTransforms = new List<string>[tasks.Length];
List<string>[] tmpTransformsOperations = new List<string>[tasks.Length];
List<string>[] tmpTransformsTypes = new List<string>[tasks.Length];
List<OperatorInputs>[] tmpTransformsInputs = new List<OperatorInputs>[tasks.Length];
for(int i = 0; i < tasks.Length; i++)
{
tmpTransforms[i] = new List<string>();
tmpTransformsOperations[i] = new List<string>();
tmpTransformsTypes[i] = new List<string>();
tmpTransformsInputs[i] = new List<OperatorInputs>();
}
if (tasks.Length >= 2)
{
for (int i = 0; i < operatorsIds.Length; i++)
{
if (tasks[i].Transforms != null)
{
int lastProcessedTransformIndex = 0;
for (int j = 0; j < tasks[i].Transforms.Length; j++)
{
var currentInput = tasks[i].TransformsInputs[j].InputId1;
int inputTaskIndex = DeploymentUtils.RetrieveTaskIndexOfOperator(currentInput, operatorsIds);
if (tmpTransforms[inputTaskIndex].Count > 0 &&
tmpTransformsOperations[inputTaskIndex][tmpTransforms[inputTaskIndex].Count - 1] == OperatorType.MoveSplit.ToString())
{
break;
}
else
{
tmpTransforms[inputTaskIndex].Add(tasks[i].Transforms[j]);
tmpTransformsOperations[inputTaskIndex].Add(tasks[i].TransformsOperations[j]);
tmpTransformsTypes[inputTaskIndex].Add(tasks[i].TransformsTypes[j]);
tmpTransformsInputs[inputTaskIndex].Add(tasks[i].TransformsInputs[j]);
if (tasks[i].TransformsOperations[j] == OperatorType.BinaryTransform.ToString())
{
var currentSecondaryInput = tasks[i].TransformsInputs[j].InputId2;
if (tasks[inputTaskIndex].OperationType != OperatorType.Move && tasks[i].OperationType != OperatorType.Move)
UpdateOperatorsSecondaryInput(operatorsIds[inputTaskIndex], operatorsIds[i], currentSecondaryInput);
else if (tasks[inputTaskIndex].OperationType != OperatorType.Move && tasks[i].OperationType == OperatorType.Move)
UpdateOperatorsSecondaryInput(operatorsIds[inputTaskIndex], ((ShuffleTask)tasks[i]).MapperVertexName, currentSecondaryInput);
else if (tasks[inputTaskIndex].OperationType == OperatorType.Move && tasks[i].OperationType != OperatorType.Move)
UpdateOperatorsSecondaryInput(((ShuffleTask)tasks[inputTaskIndex]).ReducerVertexName, operatorsIds[i], currentSecondaryInput);
else
UpdateOperatorsSecondaryInput(((ShuffleTask)tasks[inputTaskIndex]).ReducerVertexName, ((ShuffleTask)tasks[i]).MapperVertexName, currentSecondaryInput);
}
lastProcessedTransformIndex++;
}
}
for (int k = lastProcessedTransformIndex; k < tasks[i].Transforms.Length; k++)
{
tmpTransforms[i].Add(tasks[i].Transforms[k]);
tmpTransformsOperations[i].Add(tasks[i].TransformsOperations[k]);
tmpTransformsTypes[i].Add(tasks[i].TransformsTypes[k]);
tmpTransformsInputs[i].Add(tasks[i].TransformsInputs[k]);
}
}
if (tasks[i].OperationType == OperatorType.Move)
{
var currentInput = ((ShuffleTask)tasks[i]).ShuffleTransformsInputs[0].InputId1;
int inputTaskIndex = DeploymentUtils.RetrieveTaskIndexOfOperator(currentInput, operatorsIds);
if (!(tmpTransforms[inputTaskIndex].Count > 0 &&
tmpTransformsOperations[inputTaskIndex][tmpTransforms[inputTaskIndex].Count - 1] == OperatorType.MoveSplit.ToString()))
{
tmpTransforms[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransforms[0]);
tmpTransformsOperations[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransformsOperations[0]);
tmpTransformsTypes[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransformsTypes[0]);
tmpTransformsInputs[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransformsInputs[0]);
tasks[inputTaskIndex].SecondaryShuffleDescriptor = ((ShuffleTask)tasks[i]).ShuffleDescriptor;
}
}
}
for (int i = 0; i < tasks.Length; i++)
{
tasks[i].Transforms = tmpTransforms[i].ToArray();
tasks[i].TransformsOperations = tmpTransformsOperations[i].ToArray();
tasks[i].TransformsTypes = tmpTransformsTypes[i].ToArray();
tasks[i].TransformsInputs = tmpTransformsInputs[i].ToArray();
}
_operatorsTasks = new List<TaskBase>(tasks);
}
}