in src/CRA.ClientLibrary/DataProcessing/ShardedOperators/ShardedProducerOperator.cs [111:224]
public void CreateAndTransformDataset(int shardId)
{
var produceTask = (ProduceTask)_task;
MethodInfo method = typeof(ShardedProducerOperator).GetMethod("CreateDatasetFromExpression");
MethodInfo generic = method.MakeGenericMethod(
new Type[] {produceTask.OperationTypes.OutputKeyType,
produceTask.OperationTypes.OutputPayloadType,
produceTask.OperationTypes.OutputDatasetType});
object[] arguments = new Object[] { shardId, produceTask.DataProducer };
_cachedDatasets[shardId][produceTask.OutputId] = generic.Invoke(this, arguments);
_outputKeyType = produceTask.OperationTypes.OutputKeyType;
_outputPayloadType = produceTask.OperationTypes.OutputPayloadType;
_outputDatasetType = produceTask.OperationTypes.OutputDatasetType;
_outputId = produceTask.OutputId;
if (_task.Transforms != null)
{
for (int i = 0; i < _task.Transforms.Length; i++)
{
object dataset1 = null; string dataset1Id = null;
object dataset2 = null; string dataset2Id = null;
TransformUtils.PrepareTransformInputs(_task.TransformsInputs[i], ref dataset1, ref dataset1Id,
ref dataset2, ref dataset2Id, _cachedDatasets[shardId]);
string transformType = _task.TransformsOperations[i];
object transformOutput = null;
if (transformType == OperatorType.UnaryTransform.ToString())
{
UnaryOperatorTypes unaryTransformTypes = new UnaryOperatorTypes();
unaryTransformTypes.FromString(_task.TransformsTypes[i]);
if (dataset1Id == "$" && dataset1 == null)
throw new InvalidOperationException();
method = typeof(TransformUtils).GetMethod("ApplyUnaryTransformer");
generic = method.MakeGenericMethod(
new Type[] { unaryTransformTypes.InputKeyType, unaryTransformTypes.InputPayloadType,
unaryTransformTypes.InputDatasetType, unaryTransformTypes.OutputKeyType,
unaryTransformTypes.OutputPayloadType, unaryTransformTypes.OutputDatasetType
});
arguments = new Object[] { dataset1, _task.Transforms[i] };
_outputKeyType = unaryTransformTypes.OutputKeyType;
_outputPayloadType = unaryTransformTypes.OutputPayloadType;
_outputDatasetType = unaryTransformTypes.OutputDatasetType;
}
else if (transformType == OperatorType.BinaryTransform.ToString())
{
BinaryOperatorTypes binaryTransformTypes = new BinaryOperatorTypes();
binaryTransformTypes.FromString(_task.TransformsTypes[i]);
if (dataset1Id == "$" && dataset1 == null)
throw new InvalidOperationException();
if (dataset2Id == "$" && dataset2 == null)
{
dataset2Id = _task.TransformsInputs[i].InputId2;
_binaryOperatorTypes[dataset2Id] = binaryTransformTypes;
_startCreatingSecondaryDatasets[dataset2Id].Signal();
_finishCreatingSecondaryDatasets[dataset2Id].Wait();
dataset2 = _cachedDatasets[shardId][dataset2Id];
}
method = typeof(TransformUtils).GetMethod("ApplyBinaryTransformer");
generic = method.MakeGenericMethod(
new Type[] {binaryTransformTypes.InputKeyType, binaryTransformTypes.InputPayloadType,
binaryTransformTypes.InputDatasetType, binaryTransformTypes.SecondaryKeyType,
binaryTransformTypes.SecondaryPayloadType, binaryTransformTypes.SecondaryDatasetType,
binaryTransformTypes.OutputKeyType, binaryTransformTypes.OutputPayloadType,
binaryTransformTypes.OutputDatasetType
});
arguments = new Object[] { dataset1, dataset2, _task.Transforms[i] };
_outputKeyType = binaryTransformTypes.OutputKeyType;
_outputPayloadType = binaryTransformTypes.OutputPayloadType;
_outputDatasetType = binaryTransformTypes.OutputDatasetType;
}
else if (transformType == OperatorType.MoveSplit.ToString())
{
BinaryOperatorTypes splitTypes = new BinaryOperatorTypes();
splitTypes.FromString(_task.TransformsTypes[i]);
if (dataset1Id == "$" && dataset1 == null)
throw new InvalidOperationException();
method = typeof(MoveUtils).GetMethod("ApplySplitter");
generic = method.MakeGenericMethod(
new Type[] {splitTypes.InputKeyType, splitTypes.InputPayloadType,
splitTypes.InputDatasetType, splitTypes.SecondaryKeyType,
splitTypes.SecondaryPayloadType, splitTypes.SecondaryDatasetType
});
arguments = new Object[] { dataset1, _task.SecondaryShuffleDescriptor, _task.Transforms[i] };
_outputKeyType = splitTypes.SecondaryKeyType;
_outputPayloadType = splitTypes.SecondaryPayloadType;
_outputDatasetType = splitTypes.SecondaryDatasetType;
}
else
throw new InvalidOperationException("Error: Unsupported transformation type");
transformOutput = generic.Invoke(this, arguments);
if (transformOutput != null)
{
if (!_cachedDatasets[shardId].ContainsKey(dataset1Id))
_cachedDatasets[shardId].Add(dataset1Id, transformOutput);
else
_cachedDatasets[shardId][dataset1Id] = transformOutput;
}
_outputId = dataset1Id;
}
}
}