in src/CRA.ClientLibrary/DataProcessing/ShardedOperators/ShardedEndpoints/ShardedShuffleInput.cs [89:190]
private void ApplyTransformersOnInputs()
{
if (_vertex._task.Transforms != null)
{
MethodInfo method = null; MethodInfo generic = null; object[] arguments = null;
for (int i = 0; i < _vertex._task.Transforms.Length; i++)
{
object dataset1 = null; string dataset1Id = null;
object dataset2 = null; string dataset2Id = null;
TransformUtils.PrepareTransformInputs(_vertex._task.TransformsInputs[i], ref dataset1, ref dataset1Id,
ref dataset2, ref dataset2Id, _vertex._cachedDatasets[_shardId]);
string transformType = _vertex._task.TransformsOperations[i];
object transformOutput = null;
if (transformType == OperatorType.UnaryTransform.ToString())
{
UnaryOperatorTypes unaryTransformTypes = new UnaryOperatorTypes();
unaryTransformTypes.FromString(_vertex._task.TransformsTypes[i]);
if (dataset1Id == "$" && dataset1 == null)
throw new InvalidOperationException();
method = typeof(TransformUtils).GetMethod("ApplyUnaryTransformer");
generic = method.MakeGenericMethod(
new Type[] { unaryTransformTypes.InputKeyType, unaryTransformTypes.InputPayloadType,
unaryTransformTypes.InputDatasetType, unaryTransformTypes.OutputKeyType,
unaryTransformTypes.OutputPayloadType, unaryTransformTypes.OutputDatasetType
});
arguments = new Object[] { dataset1, _vertex._task.Transforms[i] };
_vertex._outputKeyType = unaryTransformTypes.OutputKeyType;
_vertex._outputPayloadType = unaryTransformTypes.OutputPayloadType;
_vertex._outputDatasetType = unaryTransformTypes.OutputDatasetType;
}
else if (transformType == OperatorType.BinaryTransform.ToString())
{
BinaryOperatorTypes binaryTransformTypes = new BinaryOperatorTypes();
binaryTransformTypes.FromString(_vertex._task.TransformsTypes[i]);
if (dataset1Id == "$" && dataset1 == null)
throw new InvalidOperationException();
if (dataset2Id == "$" && dataset2 == null)
{
dataset2Id = _vertex._task.TransformsInputs[i].InputId2;
_vertex._binaryOperatorTypes[dataset2Id] = binaryTransformTypes;
_vertex._startCreatingSecondaryDatasets[dataset2Id].Signal();
_vertex._finishCreatingSecondaryDatasets[dataset2Id].Wait();
dataset2 = _vertex._cachedDatasets[_shardId][dataset2Id];
}
method = typeof(TransformUtils).GetMethod("ApplyBinaryTransformer");
generic = method.MakeGenericMethod(
new Type[] {binaryTransformTypes.InputKeyType, binaryTransformTypes.InputPayloadType,
binaryTransformTypes.InputDatasetType, binaryTransformTypes.SecondaryKeyType,
binaryTransformTypes.SecondaryPayloadType, binaryTransformTypes.SecondaryDatasetType,
binaryTransformTypes.OutputKeyType, binaryTransformTypes.OutputPayloadType,
binaryTransformTypes.OutputDatasetType
});
arguments = new Object[] { dataset1, dataset2, _vertex._task.Transforms[i] };
_vertex._outputKeyType = binaryTransformTypes.OutputKeyType;
_vertex._outputPayloadType = binaryTransformTypes.OutputPayloadType;
_vertex._outputDatasetType = binaryTransformTypes.OutputDatasetType;
}
else if (transformType == OperatorType.MoveSplit.ToString())
{
BinaryOperatorTypes splitTypes = new BinaryOperatorTypes();
splitTypes.FromString(_vertex._task.TransformsTypes[i]);
if (dataset1Id == "$" && dataset1 == null)
throw new InvalidOperationException();
method = typeof(MoveUtils).GetMethod("ApplySplitter");
generic = method.MakeGenericMethod(
new Type[] {splitTypes.InputKeyType, splitTypes.InputPayloadType,
splitTypes.InputDatasetType, splitTypes.SecondaryKeyType,
splitTypes.SecondaryPayloadType, splitTypes.SecondaryDatasetType
});
arguments = new Object[] { dataset1, _vertex._task.SecondaryShuffleDescriptor, _vertex._task.Transforms[i] };
_vertex._outputKeyType = splitTypes.SecondaryKeyType;
_vertex._outputPayloadType = splitTypes.SecondaryPayloadType;
_vertex._outputDatasetType = splitTypes.SecondaryDatasetType;
}
else
throw new InvalidOperationException("Error: Unsupported transformation type");
if (method != null && generic != null && arguments != null)
transformOutput = generic.Invoke(this, arguments);
if (transformOutput != null)
{
if (!_vertex._cachedDatasets[_shardId].ContainsKey(dataset1Id))
_vertex._cachedDatasets[_shardId].Add(dataset1Id, transformOutput);
else
_vertex._cachedDatasets[_shardId][dataset1Id] = transformOutput;
}
_vertex._outputId = dataset1Id;
}
}
}