private void DeployMove()

in src/CRA.ClientLibrary/DataProcessing/Datasets/DeployableShardedDataset.cs [108:176]


        private void DeployMove(ref TaskBase task, ref OperatorsToplogy topology)
        {
            var isRightOperandInput = task.IsRightOperandInput;
            OperatorInputs temporaryInputs = new OperatorInputs();

            TaskBase shuffleTask = new ShuffleTask(_moveDescriptor);
            shuffleTask.OperationTypes = TransformUtils.FillBinaryTransformTypes(
                    typeof(TKeyI1), typeof(TPayloadI1), typeof(TDataSetI1),
                    typeof(TKeyI2), typeof(TPayloadI2), typeof(TDataSetI2),
                    typeof(TKeyO), typeof(TPayloadO), typeof(TDataSetO));
            shuffleTask.IsRightOperandInput = false;
            OperatorTransforms shuffleInputTransforms = new OperatorTransforms();
            (_input1 as IDeployable).Deploy(ref shuffleTask, ref topology, ref shuffleInputTransforms);
            shuffleTask.PrepareTaskTransformations(shuffleInputTransforms);
            (shuffleTask as ShuffleTask).MapperVertexName = "shufflemapper" + Guid.NewGuid().ToString();
            (shuffleTask as ShuffleTask).ReducerVertexName = typeof(ShardedShuffleOperator).Name.ToLower() + Guid.NewGuid().ToString();
            shuffleTask.InputIds.SetInputId1(shuffleTask.NextInputIds.InputId1);
            shuffleTask.InputIds.SetInputId2(shuffleTask.NextInputIds.InputId2);
            shuffleTask.OutputId = (shuffleTask as ShuffleTask).ReducerVertexName;
            OperatorTransforms shuffleTransforms = new OperatorTransforms();
            shuffleTransforms.AddTransform(SerializationHelper.Serialize(_splitter),
                    OperatorType.MoveSplit.ToString(),
                    TransformUtils.FillBinaryTransformTypes(typeof(TKeyI1), typeof(TPayloadI1), typeof(TDataSetI1),
                        typeof(TKeyI2), typeof(TPayloadI2), typeof(TDataSetI2),
                        typeof(TKeyO), typeof(TPayloadO), typeof(TDataSetO)).ToString(),
                    shuffleTask.InputIds);
            shuffleTransforms.AddTransform(SerializationHelper.Serialize(_merger),
                    OperatorType.MoveMerge.ToString(),
                    TransformUtils.FillBinaryTransformTypes(typeof(TKeyI1), typeof(TPayloadI1), typeof(TDataSetI1),
                        typeof(TKeyI2), typeof(TPayloadI2), typeof(TDataSetI2),
                        typeof(TKeyO), typeof(TPayloadO), typeof(TDataSetO)).ToString(),
                    shuffleTask.InputIds);
            ((ShuffleTask)shuffleTask).PrepareShuffleTransformations(shuffleTransforms);

            topology.AddShuffleOperator((shuffleTask as ShuffleTask).MapperVertexName, (shuffleTask as ShuffleTask).ReducerVertexName, shuffleTask as ShuffleTask);
            topology.AddOperatorInput((shuffleTask as ShuffleTask).MapperVertexName, shuffleTask.InputIds.InputId1);
            topology.AddOperatorSecondaryInput((shuffleTask as ShuffleTask).MapperVertexName, shuffleTask.InputIds.InputId2);
            topology.AddOperatorOutput(shuffleTask.InputIds.InputId1, (shuffleTask as ShuffleTask).MapperVertexName);
            topology.AddOperatorOutput(shuffleTask.InputIds.InputId2, (shuffleTask as ShuffleTask).MapperVertexName);

            if (shuffleTask.Transforms != null)
            {
                foreach (OperatorInputs inputs in shuffleTask.TransformsInputs)
                {
                    topology.AddOperatorSecondaryInput((shuffleTask as ShuffleTask).MapperVertexName, inputs.InputId2);
                    topology.AddOperatorOutput(inputs.InputId2, (shuffleTask as ShuffleTask).MapperVertexName);
                }

                foreach (OperatorInputs inputs in shuffleTask.TransformsInputs)
                {
                    if (!topology.ContainsSecondaryOperatorInput((shuffleTask as ShuffleTask).MapperVertexName, inputs.InputId1))
                    {
                        topology.AddOperatorInput((shuffleTask as ShuffleTask).MapperVertexName, inputs.InputId1);
                        topology.AddOperatorOutput(inputs.InputId1, (shuffleTask as ShuffleTask).MapperVertexName);
                    }
                }
            }

            // Update the inputs and types for the next operation
            task.InputIds.SetInputId1(shuffleTask.OutputId);
            task.OperationTypes.SetInputKeyType(typeof(TKeyO));
            task.OperationTypes.SetInputPayloadType(typeof(TPayloadO));
            task.OperationTypes.SetInputDatasetType(typeof(TDataSetO));
            if (isRightOperandInput)
                temporaryInputs.InputId2 = shuffleTask.OutputId;
            else
                temporaryInputs.InputId1 = shuffleTask.OutputId;
            task.NextInputIds = temporaryInputs;
        }