private void ApplyTransformersOnInputs()

in src/CRA.ClientLibrary/DataProcessing/ShardedOperators/ShardedEndpoints/ShardedShuffleInput.cs [89:190]


        private void ApplyTransformersOnInputs()
        {
            if (_vertex._task.Transforms != null)
            {
                MethodInfo method = null; MethodInfo generic = null; object[] arguments = null;

                for (int i = 0; i < _vertex._task.Transforms.Length; i++)
                {
                    object dataset1 = null; string dataset1Id = null;
                    object dataset2 = null; string dataset2Id = null;
                    TransformUtils.PrepareTransformInputs(_vertex._task.TransformsInputs[i], ref dataset1, ref dataset1Id,
                                        ref dataset2, ref dataset2Id, _vertex._cachedDatasets[_shardId]);

                    string transformType = _vertex._task.TransformsOperations[i];
                    object transformOutput = null;
                    if (transformType == OperatorType.UnaryTransform.ToString())
                    {
                        UnaryOperatorTypes unaryTransformTypes = new UnaryOperatorTypes();
                        unaryTransformTypes.FromString(_vertex._task.TransformsTypes[i]);
                        if (dataset1Id == "$" && dataset1 == null)
                            throw new InvalidOperationException();

                        method = typeof(TransformUtils).GetMethod("ApplyUnaryTransformer");
                        generic = method.MakeGenericMethod(
                                new Type[] { unaryTransformTypes.InputKeyType, unaryTransformTypes.InputPayloadType,
                                        unaryTransformTypes.InputDatasetType, unaryTransformTypes.OutputKeyType,
                                        unaryTransformTypes.OutputPayloadType, unaryTransformTypes.OutputDatasetType
                                });
                        arguments = new Object[] { dataset1, _vertex._task.Transforms[i] };

                        _vertex._outputKeyType = unaryTransformTypes.OutputKeyType;
                        _vertex._outputPayloadType = unaryTransformTypes.OutputPayloadType;
                        _vertex._outputDatasetType = unaryTransformTypes.OutputDatasetType;
                    }
                    else if (transformType == OperatorType.BinaryTransform.ToString())
                    {
                        BinaryOperatorTypes binaryTransformTypes = new BinaryOperatorTypes();
                        binaryTransformTypes.FromString(_vertex._task.TransformsTypes[i]);
                        if (dataset1Id == "$" && dataset1 == null)
                                       throw new InvalidOperationException();
                        if (dataset2Id == "$" && dataset2 == null)
                        {
                            dataset2Id = _vertex._task.TransformsInputs[i].InputId2;
                            _vertex._binaryOperatorTypes[dataset2Id] = binaryTransformTypes;

                            _vertex._startCreatingSecondaryDatasets[dataset2Id].Signal();
                            _vertex._finishCreatingSecondaryDatasets[dataset2Id].Wait();

                            dataset2 = _vertex._cachedDatasets[_shardId][dataset2Id];
                        }

                        method = typeof(TransformUtils).GetMethod("ApplyBinaryTransformer");
                        generic = method.MakeGenericMethod(
                            new Type[] {binaryTransformTypes.InputKeyType, binaryTransformTypes.InputPayloadType,
                                binaryTransformTypes.InputDatasetType, binaryTransformTypes.SecondaryKeyType,
                                binaryTransformTypes.SecondaryPayloadType, binaryTransformTypes.SecondaryDatasetType,
                                binaryTransformTypes.OutputKeyType, binaryTransformTypes.OutputPayloadType,
                                binaryTransformTypes.OutputDatasetType
                            });
                        arguments = new Object[] { dataset1, dataset2, _vertex._task.Transforms[i] };

                        _vertex._outputKeyType = binaryTransformTypes.OutputKeyType;
                        _vertex._outputPayloadType = binaryTransformTypes.OutputPayloadType;
                        _vertex._outputDatasetType = binaryTransformTypes.OutputDatasetType;
                        
                    }
                    else if (transformType == OperatorType.MoveSplit.ToString())
                    {
                        BinaryOperatorTypes splitTypes = new BinaryOperatorTypes();
                        splitTypes.FromString(_vertex._task.TransformsTypes[i]);
                        if (dataset1Id == "$" && dataset1 == null)
                            throw new InvalidOperationException();

                        method = typeof(MoveUtils).GetMethod("ApplySplitter");
                        generic = method.MakeGenericMethod(
                            new Type[] {splitTypes.InputKeyType, splitTypes.InputPayloadType,
                                    splitTypes.InputDatasetType, splitTypes.SecondaryKeyType,
                                    splitTypes.SecondaryPayloadType, splitTypes.SecondaryDatasetType
                            });
                        arguments = new Object[] { dataset1, _vertex._task.SecondaryShuffleDescriptor, _vertex._task.Transforms[i] };

                        _vertex._outputKeyType = splitTypes.SecondaryKeyType;
                        _vertex._outputPayloadType = splitTypes.SecondaryPayloadType;
                        _vertex._outputDatasetType = splitTypes.SecondaryDatasetType;
                    }
                    else
                        throw new InvalidOperationException("Error: Unsupported transformation type");

                    if (method != null && generic != null && arguments != null)
                        transformOutput = generic.Invoke(this, arguments);
                    if (transformOutput != null)
                    {
                        if (!_vertex._cachedDatasets[_shardId].ContainsKey(dataset1Id))
                            _vertex._cachedDatasets[_shardId].Add(dataset1Id, transformOutput);
                        else
                            _vertex._cachedDatasets[_shardId][dataset1Id] = transformOutput;
                    }

                    _vertex._outputId = dataset1Id;
                }
            }
        }