public override async Task OperatorOutputToStreamAsync()

in src/CRA.ClientLibrary/DataProcessing/ShardedOperators/ShardedEndpoints/ShardedProducerOutput.cs [19:120]


        public override async Task OperatorOutputToStreamAsync(Stream stream, string otherVertex, int otherShardId, string otherEndpoint, CancellationToken token)
        {
            _startSendingToOtherOperatorShards.Signal();  
            _startSendingToOtherOperatorShards.Wait();

            if (!_vertex._hasSplittedOutput)
            {
                if (_shardId == otherShardId)
                {
                    // Start deploying
                    await stream.ReadAllRequiredBytesAsync(_deployMsgBuffer, 0, _deployMsgBuffer.Length);
                    if (Encoding.ASCII.GetString(_deployMsgBuffer).Equals("DEPLOY"))
                    {
                        if (_vertex._hasSecondaryInput) _vertex._deployProduceInput.Signal();
                        if (_vertex._hasSecondaryInput) _vertex._deployProduceOutput.Wait();

                        await stream.WriteAsync(_deployMsgBuffer, 0, _deployMsgBuffer.Length);

                        // Start running
                        await stream.ReadAllRequiredBytesAsync(_runMsgBuffer, 0, _runMsgBuffer.Length);
                        if (Encoding.ASCII.GetString(_runMsgBuffer).Equals("RUN"))
                        {
                            if (_vertex._hasSecondaryInput) _vertex._runProduceInput.Signal();

                            if (_vertex._hasSecondaryInput)
                            {
                                if (!_vertex._isTransformationsApplied)
                                {
                                    lock (_vertex._transformationLock)
                                    {
                                        if (!_vertex._isTransformationsApplied)
                                        {
                                            _vertex.CreateAndTransformDataset(_shardId);
                                            _vertex._isTransformationsApplied = true;

                                            _vertex._continueAfterTransformation.Signal();
                                        }
                                    }
                                }
                            }

                            _vertex._continueAfterTransformation.Wait();

                            if (_vertex._hasSecondaryInput) _vertex._runProduceOutput.Wait();

                            MethodInfo method = typeof(ShardedOperatorOutputBase).GetMethod("StartProducer");
                            MethodInfo generic = method.MakeGenericMethod(
                                    new Type[] { _vertex._outputKeyType, _vertex._outputPayloadType, _vertex._outputDatasetType });
                            generic.Invoke(this, new Object[] { _vertex._cachedDatasets[_shardId][_vertex._outputId], stream });
                        }
                    }
                }
            }
            else
            {
                await stream.ReadAllRequiredBytesAsync(_deployMsgBuffer, 0, _deployMsgBuffer.Length);
                if (Encoding.ASCII.GetString(_deployMsgBuffer).Equals("DEPLOY"))
                {
                    if (_vertex._hasSecondaryInput) _vertex._deployProduceInput.Signal();
                    if (_vertex._hasSecondaryInput) _vertex._deployProduceOutput.Wait();

                    await stream.WriteAsync(_deployMsgBuffer, 0, _deployMsgBuffer.Length);

                    // Start running
                    await stream.ReadAllRequiredBytesAsync(_runMsgBuffer, 0, _runMsgBuffer.Length);
                    if (Encoding.ASCII.GetString(_runMsgBuffer).Equals("RUN"))
                    {
                        if (_vertex._hasSecondaryInput) _vertex._runProduceInput.Signal();

                        if ((otherShardId == 0) && _vertex._hasSecondaryInput)
                        {
                            if (!_vertex._isTransformationsApplied)
                            {
                                lock (_vertex._transformationLock)
                                {
                                    if (!_vertex._isTransformationsApplied)
                                    {
                                        _vertex.CreateAndTransformDataset(_shardId);
                                        _vertex._isTransformationsApplied = true;

                                        _vertex._continueAfterTransformation.Signal();
                                    }
                                }
                            }
                        }

                        _vertex._continueAfterTransformation.Wait();

                        if (_vertex._hasSecondaryInput) _vertex._runProduceOutput.Wait();

                        object[] splitDatasets = (object[])_vertex._cachedDatasets[_shardId][_vertex._outputId];
                        MethodInfo method = typeof(ShardedOperatorOutputBase).GetMethod("StartProducer");
                        MethodInfo generic = method.MakeGenericMethod(
                                new Type[] { _vertex._outputKeyType, _vertex._outputPayloadType, _vertex._outputDatasetType });
                        generic.Invoke(this, new Object[] { splitDatasets[otherShardId], stream });
                    }
                }
            }

            _finishSendingToOtherOperatorShards.Signal();
            _finishSendingToOtherOperatorShards.Wait();
        }