public void CreateAndTransformDataset()

in src/CRA.ClientLibrary/DataProcessing/ShardedOperators/ShardedProducerOperator.cs [111:224]


        public void CreateAndTransformDataset(int shardId)
        {
            var produceTask = (ProduceTask)_task;

            MethodInfo method = typeof(ShardedProducerOperator).GetMethod("CreateDatasetFromExpression");
            MethodInfo generic = method.MakeGenericMethod(
                                    new Type[] {produceTask.OperationTypes.OutputKeyType,
                                                produceTask.OperationTypes.OutputPayloadType,
                                                produceTask.OperationTypes.OutputDatasetType});
            object[] arguments = new Object[] { shardId, produceTask.DataProducer };
            _cachedDatasets[shardId][produceTask.OutputId] = generic.Invoke(this, arguments);

            _outputKeyType = produceTask.OperationTypes.OutputKeyType;
            _outputPayloadType = produceTask.OperationTypes.OutputPayloadType;
            _outputDatasetType = produceTask.OperationTypes.OutputDatasetType;
            _outputId = produceTask.OutputId;

            if (_task.Transforms != null)
            {
                for (int i = 0; i < _task.Transforms.Length; i++)
                {
                    object dataset1 = null; string dataset1Id = null;
                    object dataset2 = null; string dataset2Id = null;
                    TransformUtils.PrepareTransformInputs(_task.TransformsInputs[i], ref dataset1, ref dataset1Id,
                                        ref dataset2, ref dataset2Id, _cachedDatasets[shardId]);

                    string transformType = _task.TransformsOperations[i];
                    object transformOutput = null;
                    if (transformType == OperatorType.UnaryTransform.ToString())
                    {
                        UnaryOperatorTypes unaryTransformTypes = new UnaryOperatorTypes();
                        unaryTransformTypes.FromString(_task.TransformsTypes[i]);
                        if (dataset1Id == "$" && dataset1 == null)
                            throw new InvalidOperationException();

                        method = typeof(TransformUtils).GetMethod("ApplyUnaryTransformer");
                        generic = method.MakeGenericMethod(
                              new Type[] { unaryTransformTypes.InputKeyType, unaryTransformTypes.InputPayloadType,
                                       unaryTransformTypes.InputDatasetType, unaryTransformTypes.OutputKeyType,
                                       unaryTransformTypes.OutputPayloadType, unaryTransformTypes.OutputDatasetType
                              });
                        arguments = new Object[] { dataset1, _task.Transforms[i] };

                        _outputKeyType = unaryTransformTypes.OutputKeyType;
                        _outputPayloadType = unaryTransformTypes.OutputPayloadType;
                        _outputDatasetType = unaryTransformTypes.OutputDatasetType;
                    }
                    else if (transformType == OperatorType.BinaryTransform.ToString())
                    {
                        BinaryOperatorTypes binaryTransformTypes = new BinaryOperatorTypes();
                        binaryTransformTypes.FromString(_task.TransformsTypes[i]);
                        if (dataset1Id == "$" && dataset1 == null)
                                      throw new InvalidOperationException();
                        if (dataset2Id == "$" && dataset2 == null)
                        {
                            dataset2Id = _task.TransformsInputs[i].InputId2;
                            _binaryOperatorTypes[dataset2Id] = binaryTransformTypes;

                            _startCreatingSecondaryDatasets[dataset2Id].Signal();
                            _finishCreatingSecondaryDatasets[dataset2Id].Wait();

                            dataset2 = _cachedDatasets[shardId][dataset2Id];
                        }

                        method = typeof(TransformUtils).GetMethod("ApplyBinaryTransformer");
                        generic = method.MakeGenericMethod(
                            new Type[] {binaryTransformTypes.InputKeyType, binaryTransformTypes.InputPayloadType,
                                binaryTransformTypes.InputDatasetType, binaryTransformTypes.SecondaryKeyType,
                                binaryTransformTypes.SecondaryPayloadType, binaryTransformTypes.SecondaryDatasetType,
                                binaryTransformTypes.OutputKeyType, binaryTransformTypes.OutputPayloadType,
                                binaryTransformTypes.OutputDatasetType
                            });
                        arguments = new Object[] { dataset1, dataset2, _task.Transforms[i] };

                        _outputKeyType = binaryTransformTypes.OutputKeyType;
                        _outputPayloadType = binaryTransformTypes.OutputPayloadType;
                        _outputDatasetType = binaryTransformTypes.OutputDatasetType;

                    }
                    else if (transformType == OperatorType.MoveSplit.ToString())
                    {
                        BinaryOperatorTypes splitTypes = new BinaryOperatorTypes();
                        splitTypes.FromString(_task.TransformsTypes[i]);
                        if (dataset1Id == "$" && dataset1 == null)
                            throw new InvalidOperationException();

                        method = typeof(MoveUtils).GetMethod("ApplySplitter");
                        generic = method.MakeGenericMethod(
                            new Type[] {splitTypes.InputKeyType, splitTypes.InputPayloadType,
                                    splitTypes.InputDatasetType, splitTypes.SecondaryKeyType,
                                    splitTypes.SecondaryPayloadType, splitTypes.SecondaryDatasetType
                            });
                        arguments = new Object[] { dataset1, _task.SecondaryShuffleDescriptor, _task.Transforms[i] };

                        _outputKeyType = splitTypes.SecondaryKeyType;
                        _outputPayloadType = splitTypes.SecondaryPayloadType;
                        _outputDatasetType = splitTypes.SecondaryDatasetType;
                    }
                    else
                        throw new InvalidOperationException("Error: Unsupported transformation type");

                    transformOutput = generic.Invoke(this, arguments);
                    if (transformOutput != null)
                    {
                        if (!_cachedDatasets[shardId].ContainsKey(dataset1Id))
                            _cachedDatasets[shardId].Add(dataset1Id, transformOutput);
                        else
                            _cachedDatasets[shardId][dataset1Id] = transformOutput;
                    }

                    _outputId = dataset1Id;
                }
            }
        }