public void ReduceScatter()

in MPI/Intracommunicator.cs [1427:1540]


        public void ReduceScatter<T>(T[] inValues, ReductionOperation<T> op, int[] counts, ref T[] outValues)
        {
            // Make sure the outgoing array is the right size
            if (outValues == null || outValues.Length != counts[Rank])
                outValues = new T[counts[Rank]];

            // Make sure inValues has the right length
            int sum = 0;
            for (int i = 0; i < Size; ++i) checked
            {
                sum += counts[i];
            }
            if (inValues.Length != sum)
                throw new ArgumentException("inValues array must have enough values for all of the receivers", "inValues");

            MPI_Datatype datatype = FastDatatypeCache<T>.datatype;

            if (datatype == Unsafe.MPI_DATATYPE_NULL)
            {
                if (Size == 1)
                {
                    for (int i = 0; i < inValues.Length; ++i)
                        outValues[i] = inValues[i];
                    return;
                }

                // Reduce the entire array to a root
                T[] results = Reduce(inValues, op, 0);

                if (Rank == 0)
                {
                    // This is the root process
                    using (UnmanagedMemoryStream sendStream = new UnmanagedMemoryStream())
                    {
                        // Serialize all of the outgoing data for the scatter
                        int[] resultCounts = new int[Size];
                        int[] resultOffsets = new int[Size];
                        resultCounts[0] = 0;
                        resultOffsets[0] = 0;
                        int position = counts[0];
                        for (int dest = 1; dest < Size; ++dest)
                        {
                            // Serialize the outgoing elements for the process with rank "dest"
                            resultOffsets[dest] = Convert.ToInt32(sendStream.Length);
                            for (int i = 0; i < counts[dest]; ++i)
                            {
                                Serialize(sendStream, results[position]);
                                ++position;
                            }
                            resultCounts[dest] = checked(Convert.ToInt32(sendStream.Length) - resultOffsets[dest]);
                        }

                        // Scatter the counts to tell each process how many bytes to expect
                        Scatter(true, resultCounts, 0);

                        // Scatter the reduced, serialized data to all of the other processes
                        unsafe
                        {
                            int errorCode = Unsafe.MPI_Scatterv(sendStream.Buffer, resultCounts, resultOffsets, Unsafe.MPI_BYTE,
                                                                new IntPtr(), 0, Unsafe.MPI_BYTE, 0, comm);
                            if (errorCode != Unsafe.MPI_SUCCESS)
                                throw Environment.TranslateErrorIntoException(errorCode);
                        }

                        // Copy our own results to the resulting array
                        for (int i = 0; i < counts[0]; ++i)
                            outValues[i] = results[i];
                    }
                }
                else
                {
                    // The number of bytes we should expect from the root
                    int receiveCount = Scatter<int>(0);

                    using (UnmanagedMemoryStream receiveStream = new UnmanagedMemoryStream(receiveCount))
                    {
                        // Receive the serialized form of our part of the result
                        unsafe
                        {
                            int errorCode = Unsafe.MPI_Scatterv(new IntPtr(), null, null, Unsafe.MPI_BYTE,
                                                                receiveStream.Buffer, receiveCount, Unsafe.MPI_BYTE, 0, comm);
                            if (errorCode != Unsafe.MPI_SUCCESS)
                                throw Environment.TranslateErrorIntoException(errorCode);
                        }

                        // Deserialize the incoming stream
                        for (int i = 0; i < counts[Rank]; ++i)
                        {
                            // Seek to the appropriate position in the stream, since we cannot trust the deserializer to stop at the right place.
                            //receiveStream.Position = ???
                            throw new NotImplementedException();
                            outValues[i] = Deserialize<T>(receiveStream);
                        }
                    }
                }
            }
            else
            {
                // Use the low-level MPI reduce-scatter operation from the root
                using (Operation<T> mpiOp = new Operation<T>(op))
                {
                    GCHandle inHandle = GCHandle.Alloc(inValues, GCHandleType.Pinned);
                    GCHandle outHandle = GCHandle.Alloc(outValues, GCHandleType.Pinned);
                    int errorCode = Unsafe.MPI_Reduce_scatter(inHandle.AddrOfPinnedObject(),
                                                              outHandle.AddrOfPinnedObject(),
                                                              counts, datatype, mpiOp.Op, comm);
                    inHandle.Free();
                    outHandle.Free();

                    if (errorCode != Unsafe.MPI_SUCCESS)
                        throw Environment.TranslateErrorIntoException(errorCode);
                }
            }
        }