in src/CRA.ClientLibrary/DataProcessing/Datasets/DeployableShardedDataset.cs [108:176]
private void DeployMove(ref TaskBase task, ref OperatorsToplogy topology)
{
var isRightOperandInput = task.IsRightOperandInput;
OperatorInputs temporaryInputs = new OperatorInputs();
TaskBase shuffleTask = new ShuffleTask(_moveDescriptor);
shuffleTask.OperationTypes = TransformUtils.FillBinaryTransformTypes(
typeof(TKeyI1), typeof(TPayloadI1), typeof(TDataSetI1),
typeof(TKeyI2), typeof(TPayloadI2), typeof(TDataSetI2),
typeof(TKeyO), typeof(TPayloadO), typeof(TDataSetO));
shuffleTask.IsRightOperandInput = false;
OperatorTransforms shuffleInputTransforms = new OperatorTransforms();
(_input1 as IDeployable).Deploy(ref shuffleTask, ref topology, ref shuffleInputTransforms);
shuffleTask.PrepareTaskTransformations(shuffleInputTransforms);
(shuffleTask as ShuffleTask).MapperVertexName = "shufflemapper" + Guid.NewGuid().ToString();
(shuffleTask as ShuffleTask).ReducerVertexName = typeof(ShardedShuffleOperator).Name.ToLower() + Guid.NewGuid().ToString();
shuffleTask.InputIds.SetInputId1(shuffleTask.NextInputIds.InputId1);
shuffleTask.InputIds.SetInputId2(shuffleTask.NextInputIds.InputId2);
shuffleTask.OutputId = (shuffleTask as ShuffleTask).ReducerVertexName;
OperatorTransforms shuffleTransforms = new OperatorTransforms();
shuffleTransforms.AddTransform(SerializationHelper.Serialize(_splitter),
OperatorType.MoveSplit.ToString(),
TransformUtils.FillBinaryTransformTypes(typeof(TKeyI1), typeof(TPayloadI1), typeof(TDataSetI1),
typeof(TKeyI2), typeof(TPayloadI2), typeof(TDataSetI2),
typeof(TKeyO), typeof(TPayloadO), typeof(TDataSetO)).ToString(),
shuffleTask.InputIds);
shuffleTransforms.AddTransform(SerializationHelper.Serialize(_merger),
OperatorType.MoveMerge.ToString(),
TransformUtils.FillBinaryTransformTypes(typeof(TKeyI1), typeof(TPayloadI1), typeof(TDataSetI1),
typeof(TKeyI2), typeof(TPayloadI2), typeof(TDataSetI2),
typeof(TKeyO), typeof(TPayloadO), typeof(TDataSetO)).ToString(),
shuffleTask.InputIds);
((ShuffleTask)shuffleTask).PrepareShuffleTransformations(shuffleTransforms);
topology.AddShuffleOperator((shuffleTask as ShuffleTask).MapperVertexName, (shuffleTask as ShuffleTask).ReducerVertexName, shuffleTask as ShuffleTask);
topology.AddOperatorInput((shuffleTask as ShuffleTask).MapperVertexName, shuffleTask.InputIds.InputId1);
topology.AddOperatorSecondaryInput((shuffleTask as ShuffleTask).MapperVertexName, shuffleTask.InputIds.InputId2);
topology.AddOperatorOutput(shuffleTask.InputIds.InputId1, (shuffleTask as ShuffleTask).MapperVertexName);
topology.AddOperatorOutput(shuffleTask.InputIds.InputId2, (shuffleTask as ShuffleTask).MapperVertexName);
if (shuffleTask.Transforms != null)
{
foreach (OperatorInputs inputs in shuffleTask.TransformsInputs)
{
topology.AddOperatorSecondaryInput((shuffleTask as ShuffleTask).MapperVertexName, inputs.InputId2);
topology.AddOperatorOutput(inputs.InputId2, (shuffleTask as ShuffleTask).MapperVertexName);
}
foreach (OperatorInputs inputs in shuffleTask.TransformsInputs)
{
if (!topology.ContainsSecondaryOperatorInput((shuffleTask as ShuffleTask).MapperVertexName, inputs.InputId1))
{
topology.AddOperatorInput((shuffleTask as ShuffleTask).MapperVertexName, inputs.InputId1);
topology.AddOperatorOutput(inputs.InputId1, (shuffleTask as ShuffleTask).MapperVertexName);
}
}
}
// Update the inputs and types for the next operation
task.InputIds.SetInputId1(shuffleTask.OutputId);
task.OperationTypes.SetInputKeyType(typeof(TKeyO));
task.OperationTypes.SetInputPayloadType(typeof(TPayloadO));
task.OperationTypes.SetInputDatasetType(typeof(TDataSetO));
if (isRightOperandInput)
temporaryInputs.InputId2 = shuffleTask.OutputId;
else
temporaryInputs.InputId1 = shuffleTask.OutputId;
task.NextInputIds = temporaryInputs;
}