in MPI/Intercommunicator.cs [752:859]
public void AlltoallFlattened<T>(T[] inValues, int[] sendCounts, int[] recvCounts, ref T[] outValues)
{
if (sendCounts.Length != RemoteSize)
throw new ArgumentException("sendCounts should be the size of the remote group", "sendCounts");
if (recvCounts.Length != RemoteSize)
throw new ArgumentException("recvCounts should be the size of the remote group", "recvCounts");
int totalCounts = 0;
for (int i = 0; i < recvCounts.Length; i++) checked
{
totalCounts += recvCounts[i];
}
// Make sure the outgoing array is the right size
if (outValues == null || outValues.Length != totalCounts)
outValues = new T[totalCounts];
MPI_Datatype datatype = FastDatatypeCache<T>.datatype;
if (datatype == Unsafe.MPI_DATATYPE_NULL)
{
T[] temp;
int currentLocation;
// There is no associated MPI datatype for this type, so we will
// need to serialize the value for transmission.
int[] sendCountsSerialized = new int[RemoteSize];
int[] sendOffsets = new int[RemoteSize];
using (UnmanagedMemoryStream sendStream = new UnmanagedMemoryStream())
{
currentLocation = 0;
// Serialize all of the outgoing data to the outgoing stream
for (int dest = 0; dest < RemoteSize; ++dest) checked
{
sendOffsets[dest] = Convert.ToInt32(sendStream.Length);
temp = new T[sendCounts[dest]];
Array.Copy(inValues, currentLocation, temp, 0, sendCounts[dest]);
currentLocation += sendCounts[dest];
Serialize(sendStream, temp);
sendCountsSerialized[dest] = Convert.ToInt32(sendStream.Length) - sendOffsets[dest];
}
// Use all-to-all on integers to tell every process how much data
// it will be receiving.
int[] recvCountsSerialized = Alltoall(sendCountsSerialized);
// Compute the offsets at which each of the streams will be received
int[] recvOffsets = new int[RemoteSize];
recvOffsets[0] = 0;
for (int i = 1; i < RemoteSize; ++i) checked
{
recvOffsets[i] = recvOffsets[i - 1] + recvCountsSerialized[i - 1];
}
// Total length of the receive buffer
int recvLength = checked(recvOffsets[RemoteSize - 1] + recvCountsSerialized[RemoteSize - 1]);
using (UnmanagedMemoryStream recvStream = new UnmanagedMemoryStream(recvLength))
{
// Build receive buffer and exchange all of the data
unsafe
{
int errorCode = Unsafe.MPI_Alltoallv(sendStream.Buffer, sendCountsSerialized, sendOffsets, Unsafe.MPI_BYTE,
recvStream.Buffer, recvCountsSerialized, recvOffsets, Unsafe.MPI_BYTE, comm);
if (errorCode != Unsafe.MPI_SUCCESS)
throw Environment.TranslateErrorIntoException(errorCode);
}
// De-serialize the received data
currentLocation = 0;
for (int source = 0; source < RemoteSize; ++source) checked
{
// Seek to the proper location in the stream and de-serialize
recvStream.Position = recvOffsets[source];
temp = Deserialize<T[]>(recvStream);
Array.Copy(temp, 0, outValues, currentLocation, temp.Length);
currentLocation += temp.Length;
}
}
}
}
else
{
int[] sendDispls = new int[RemoteSize];
int[] recvDispls = new int[RemoteSize];
sendDispls[0] = 0;
recvDispls[0] = 0;
for (int i = 1; i < RemoteSize; i++) checked
{
sendDispls[i] = sendDispls[i - 1] + sendCounts[i - 1];
recvDispls[i] = recvDispls[i - 1] + recvCounts[i - 1];
}
GCHandle inHandle = GCHandle.Alloc(inValues, GCHandleType.Pinned);
GCHandle outHandle = GCHandle.Alloc(outValues, GCHandleType.Pinned);
int errorCode;
unsafe
{
errorCode = Unsafe.MPI_Alltoallv(Marshal.UnsafeAddrOfPinnedArrayElement(inValues, 0), sendCounts, sendDispls, datatype,
Marshal.UnsafeAddrOfPinnedArrayElement(outValues, 0), recvCounts, recvDispls, datatype, comm);
}
inHandle.Free();
outHandle.Free();
if (errorCode != Unsafe.MPI_SUCCESS)
throw Environment.TranslateErrorIntoException(errorCode);
}
}