in MPI/Intracommunicator.cs [700:774]
protected void AlltoallFlattened_serialized<T>(T[] inValues, int[] sendCounts, int[] recvCounts, T[] outValues)
{
using (UnmanagedMemoryStream sendStream = new UnmanagedMemoryStream())
{
int selfLocation = 0;
int inLocation = 0;
// Serialize all of the outgoing data to the outgoing stream
int[] sendOffsets = new int[Size];
int[] sendCountsSerialized = new int[Size];
for (int dest = 0; dest < Size; ++dest) checked
{
sendOffsets[dest] = Convert.ToInt32(sendStream.Length);
if (dest == Rank)
selfLocation = inLocation;
else if (sendCounts[dest] > 0)
{
var temp = new T[sendCounts[dest]];
Array.Copy(inValues, inLocation, temp, 0, sendCounts[dest]);
Serialize(sendStream, temp);
}
inLocation += sendCounts[dest];
sendCountsSerialized[dest] = Convert.ToInt32(sendStream.Length) - sendOffsets[dest];
}
// Use all-to-all on integers to tell every process how much data
// it will be receiving.
int[] recvCountsSerialized = Alltoall(sendCountsSerialized);
// Compute the offsets at which each of the streams will be received
int[] recvOffsets = new int[Size];
recvOffsets[0] = 0;
for (int i = 1; i < Size; ++i) checked
{
recvOffsets[i] = recvOffsets[i - 1] + recvCountsSerialized[i - 1];
}
// Total length of the receive buffer
int recvLength = checked(recvOffsets[Size - 1] + recvCountsSerialized[Size - 1]);
using (UnmanagedMemoryStream recvStream = new UnmanagedMemoryStream(recvLength))
{
// Build receive buffer and exchange all of the data
unsafe
{
int errorCode = Unsafe.MPI_Alltoallv(sendStream.Buffer, sendCountsSerialized, sendOffsets, Unsafe.MPI_BYTE,
recvStream.Buffer, recvCountsSerialized, recvOffsets, Unsafe.MPI_BYTE, comm);
if (errorCode != Unsafe.MPI_SUCCESS)
throw Environment.TranslateErrorIntoException(errorCode);
}
// De-serialize the received data
int outLocation = 0;
for (int source = 0; source < Size; ++source) checked
{
if (source == Rank)
{
// We never transmitted this object, so we don't need to de-serialize it.
if (sendCounts[source] != recvCounts[source])
throw new ArgumentException($"sendCounts[Rank] ({sendCounts[Rank]}) != recvCounts[Rank] ({recvCounts[Rank]})");
Array.Copy(inValues, selfLocation, outValues, outLocation, recvCounts[source]);
}
else if (recvCounts[source] > 0)
{
// Seek to the proper location in the stream and de-serialize
recvStream.Position = recvOffsets[source];
var temp = Deserialize<T[]>(recvStream);
if (temp.Length != recvCounts[source])
throw new Exception($"Deserialized array has length {temp.Length}; expected {recvCounts[source]}");
Array.Copy(temp, 0, outValues, outLocation, temp.Length);
}
outLocation += recvCounts[source];
}
}
}
}