internal override void InitializeOperator()

in src/CRA.ClientLibrary/DataProcessing/ShardedOperators/ShardedProducerOperator.cs [44:109]


        internal override void InitializeOperator(int shardId, ShardingInfo shardingInfo)
        {
            _hasSplittedOutput = HasSplittedOutput();
            string[] toEndpoints = GetEndpointNamesForVertex(VertexName.Split('$')[0], _toFromConnections);
            string[] fromEndpoints = GetEndpointNamesForVertex(VertexName.Split('$')[0], _fromToConnections);

            int secondaryOutputsCount = 0;
            int ordinaryOutputSCount = 0;
            foreach (var fromEndpoint in fromEndpoints)
            {
                var toTuple = _fromToConnections[new Tuple<string, string>(VertexName.Split('$')[0], fromEndpoint)];
                if (toTuple.Item4)
                    secondaryOutputsCount++;
                else
                    ordinaryOutputSCount++;
            }
            int deployProduceInputCount = secondaryOutputsCount;
            if (_hasSplittedOutput)
                deployProduceInputCount += shardingInfo.AllShards.Length;
            else
                deployProduceInputCount += ordinaryOutputSCount;
            _deployProduceInput = new CountdownEvent(deployProduceInputCount);
            _runProduceInput = new CountdownEvent(deployProduceInputCount);

            int secondaryInputsCount = 0;
            foreach (var toEndpoint in toEndpoints)
            {
                var fromTuple = _toFromConnections[new Tuple<string, string>(VertexName.Split('$')[0], toEndpoint)];
                if (fromTuple.Item4) secondaryInputsCount++;
            }
            _deployProduceOutput = new CountdownEvent(secondaryInputsCount);
            _runProduceOutput = new CountdownEvent(secondaryInputsCount);

            _continueAfterTransformation = new CountdownEvent(1);

            if (secondaryInputsCount > 0) _hasSecondaryInput = true;

            foreach (var toEndpoint in toEndpoints)
            {
                var fromTuple = _toFromConnections[new Tuple<string, string>(VertexName.Split('$')[0], toEndpoint)];
                if (!fromTuple.Item4)
                    throw new NotImplementedException("Shared input endpoints are not supported in produce operators!!");
                else
                {
                    _startCreatingSecondaryDatasets[fromTuple.Item1] = new CountdownEvent(1);
                    _finishCreatingSecondaryDatasets[fromTuple.Item1] = new CountdownEvent(1);
                    AddAsyncInputEndpoint(toEndpoint, new ShardedProducerSecondaryInput(this, shardId, shardingInfo.AllShards.Length, toEndpoint));
                }
            }

            if (!_hasSecondaryInput)
            {
                CreateAndTransformDataset(shardId);
                _isTransformationsApplied = true;
                _continueAfterTransformation.Signal();
            }

            foreach (var fromEndpoint in fromEndpoints)
            {
                var toTuple = _fromToConnections[new Tuple<string, string>(VertexName.Split('$')[0], fromEndpoint)];
                if (!toTuple.Item4)
                    AddAsyncOutputEndpoint(fromEndpoint, new ShardedProducerOutput(this, shardId, shardingInfo.AllShards.Length, fromEndpoint));
                else
                    AddAsyncOutputEndpoint(fromEndpoint, new ShardedProducerSecondaryOutput(this, shardId, shardingInfo.AllShards.Length, fromEndpoint));
            }
        }