in MPI/Intracommunicator.cs [1427:1540]
public void ReduceScatter<T>(T[] inValues, ReductionOperation<T> op, int[] counts, ref T[] outValues)
{
// Make sure the outgoing array is the right size
if (outValues == null || outValues.Length != counts[Rank])
outValues = new T[counts[Rank]];
// Make sure inValues has the right length
int sum = 0;
for (int i = 0; i < Size; ++i) checked
{
sum += counts[i];
}
if (inValues.Length != sum)
throw new ArgumentException("inValues array must have enough values for all of the receivers", "inValues");
MPI_Datatype datatype = FastDatatypeCache<T>.datatype;
if (datatype == Unsafe.MPI_DATATYPE_NULL)
{
if (Size == 1)
{
for (int i = 0; i < inValues.Length; ++i)
outValues[i] = inValues[i];
return;
}
// Reduce the entire array to a root
T[] results = Reduce(inValues, op, 0);
if (Rank == 0)
{
// This is the root process
using (UnmanagedMemoryStream sendStream = new UnmanagedMemoryStream())
{
// Serialize all of the outgoing data for the scatter
int[] resultCounts = new int[Size];
int[] resultOffsets = new int[Size];
resultCounts[0] = 0;
resultOffsets[0] = 0;
int position = counts[0];
for (int dest = 1; dest < Size; ++dest)
{
// Serialize the outgoing elements for the process with rank "dest"
resultOffsets[dest] = Convert.ToInt32(sendStream.Length);
for (int i = 0; i < counts[dest]; ++i)
{
Serialize(sendStream, results[position]);
++position;
}
resultCounts[dest] = checked(Convert.ToInt32(sendStream.Length) - resultOffsets[dest]);
}
// Scatter the counts to tell each process how many bytes to expect
Scatter(true, resultCounts, 0);
// Scatter the reduced, serialized data to all of the other processes
unsafe
{
int errorCode = Unsafe.MPI_Scatterv(sendStream.Buffer, resultCounts, resultOffsets, Unsafe.MPI_BYTE,
new IntPtr(), 0, Unsafe.MPI_BYTE, 0, comm);
if (errorCode != Unsafe.MPI_SUCCESS)
throw Environment.TranslateErrorIntoException(errorCode);
}
// Copy our own results to the resulting array
for (int i = 0; i < counts[0]; ++i)
outValues[i] = results[i];
}
}
else
{
// The number of bytes we should expect from the root
int receiveCount = Scatter<int>(0);
using (UnmanagedMemoryStream receiveStream = new UnmanagedMemoryStream(receiveCount))
{
// Receive the serialized form of our part of the result
unsafe
{
int errorCode = Unsafe.MPI_Scatterv(new IntPtr(), null, null, Unsafe.MPI_BYTE,
receiveStream.Buffer, receiveCount, Unsafe.MPI_BYTE, 0, comm);
if (errorCode != Unsafe.MPI_SUCCESS)
throw Environment.TranslateErrorIntoException(errorCode);
}
// Deserialize the incoming stream
for (int i = 0; i < counts[Rank]; ++i)
{
// Seek to the appropriate position in the stream, since we cannot trust the deserializer to stop at the right place.
//receiveStream.Position = ???
throw new NotImplementedException();
outValues[i] = Deserialize<T>(receiveStream);
}
}
}
}
else
{
// Use the low-level MPI reduce-scatter operation from the root
using (Operation<T> mpiOp = new Operation<T>(op))
{
GCHandle inHandle = GCHandle.Alloc(inValues, GCHandleType.Pinned);
GCHandle outHandle = GCHandle.Alloc(outValues, GCHandleType.Pinned);
int errorCode = Unsafe.MPI_Reduce_scatter(inHandle.AddrOfPinnedObject(),
outHandle.AddrOfPinnedObject(),
counts, datatype, mpiOp.Op, comm);
inHandle.Free();
outHandle.Free();
if (errorCode != Unsafe.MPI_SUCCESS)
throw Environment.TranslateErrorIntoException(errorCode);
}
}
}