internal void PrepareFinalOperatorsTasks()

in src/CRA.ClientLibrary/DataProcessing/Definitions/Operator/OperatorsToplogy.cs [204:301]


        internal void PrepareFinalOperatorsTasks()
        {
            string[] operatorsIds = _operatorsIds.ToArray();
            TaskBase[] tasks =  _operatorsTasks.ToArray();

            List<string>[] tmpTransforms = new List<string>[tasks.Length];
            List<string>[] tmpTransformsOperations = new List<string>[tasks.Length];
            List<string>[] tmpTransformsTypes = new List<string>[tasks.Length];
            List<OperatorInputs>[] tmpTransformsInputs = new List<OperatorInputs>[tasks.Length];

            for(int i = 0; i < tasks.Length; i++)
            {
                tmpTransforms[i] = new List<string>();
                tmpTransformsOperations[i] = new List<string>();
                tmpTransformsTypes[i] = new List<string>();
                tmpTransformsInputs[i] = new List<OperatorInputs>();
            }

            if (tasks.Length >= 2)
            {
                for (int i = 0; i < operatorsIds.Length; i++)
                {
                    if (tasks[i].Transforms != null)
                    {
                        int lastProcessedTransformIndex = 0;
                        for (int j = 0; j < tasks[i].Transforms.Length; j++)
                        {
                            var currentInput = tasks[i].TransformsInputs[j].InputId1;
                            int inputTaskIndex = DeploymentUtils.RetrieveTaskIndexOfOperator(currentInput, operatorsIds);

                            if (tmpTransforms[inputTaskIndex].Count > 0 &&
                                tmpTransformsOperations[inputTaskIndex][tmpTransforms[inputTaskIndex].Count - 1] == OperatorType.MoveSplit.ToString())
                            {
                                break;
                            }
                            else
                            {
                                tmpTransforms[inputTaskIndex].Add(tasks[i].Transforms[j]);
                                tmpTransformsOperations[inputTaskIndex].Add(tasks[i].TransformsOperations[j]);
                                tmpTransformsTypes[inputTaskIndex].Add(tasks[i].TransformsTypes[j]);
                                tmpTransformsInputs[inputTaskIndex].Add(tasks[i].TransformsInputs[j]);

                                if (tasks[i].TransformsOperations[j] == OperatorType.BinaryTransform.ToString())
                                {
                                    var currentSecondaryInput = tasks[i].TransformsInputs[j].InputId2;

                                    if (tasks[inputTaskIndex].OperationType != OperatorType.Move && tasks[i].OperationType != OperatorType.Move)
                                        UpdateOperatorsSecondaryInput(operatorsIds[inputTaskIndex], operatorsIds[i], currentSecondaryInput);
                                    else if (tasks[inputTaskIndex].OperationType != OperatorType.Move && tasks[i].OperationType == OperatorType.Move)
                                        UpdateOperatorsSecondaryInput(operatorsIds[inputTaskIndex], ((ShuffleTask)tasks[i]).MapperVertexName, currentSecondaryInput);
                                    else if (tasks[inputTaskIndex].OperationType == OperatorType.Move && tasks[i].OperationType != OperatorType.Move)
                                        UpdateOperatorsSecondaryInput(((ShuffleTask)tasks[inputTaskIndex]).ReducerVertexName, operatorsIds[i], currentSecondaryInput);
                                    else
                                        UpdateOperatorsSecondaryInput(((ShuffleTask)tasks[inputTaskIndex]).ReducerVertexName, ((ShuffleTask)tasks[i]).MapperVertexName, currentSecondaryInput);
                                }

                                lastProcessedTransformIndex++;
                            }
                        }

                        for (int k = lastProcessedTransformIndex; k < tasks[i].Transforms.Length; k++)
                        {
                            tmpTransforms[i].Add(tasks[i].Transforms[k]);
                            tmpTransformsOperations[i].Add(tasks[i].TransformsOperations[k]);
                            tmpTransformsTypes[i].Add(tasks[i].TransformsTypes[k]);
                            tmpTransformsInputs[i].Add(tasks[i].TransformsInputs[k]);
                        }
                    }

                    if (tasks[i].OperationType == OperatorType.Move)
                    {
                        var currentInput = ((ShuffleTask)tasks[i]).ShuffleTransformsInputs[0].InputId1;
                        int inputTaskIndex = DeploymentUtils.RetrieveTaskIndexOfOperator(currentInput, operatorsIds);

                        if (!(tmpTransforms[inputTaskIndex].Count > 0 &&
                            tmpTransformsOperations[inputTaskIndex][tmpTransforms[inputTaskIndex].Count - 1] == OperatorType.MoveSplit.ToString()))
                        {
                            tmpTransforms[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransforms[0]);
                            tmpTransformsOperations[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransformsOperations[0]);
                            tmpTransformsTypes[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransformsTypes[0]);
                            tmpTransformsInputs[inputTaskIndex].Add(((ShuffleTask)tasks[i]).ShuffleTransformsInputs[0]);
                            tasks[inputTaskIndex].SecondaryShuffleDescriptor = ((ShuffleTask)tasks[i]).ShuffleDescriptor;
                        }

                    }
                }

                for (int i = 0; i < tasks.Length; i++)
                {
                    tasks[i].Transforms = tmpTransforms[i].ToArray();
                    tasks[i].TransformsOperations = tmpTransformsOperations[i].ToArray();
                    tasks[i].TransformsTypes = tmpTransformsTypes[i].ToArray();
                    tasks[i].TransformsInputs = tmpTransformsInputs[i].ToArray();
                }

                _operatorsTasks = new List<TaskBase>(tasks);
            }
        }