in AmbrosiaLib/Ambrosia/Program.cs [480:647]
internal async Task<BuffersCursor> SendAsync(Stream outputStream,
BuffersCursor placeToStart)
{
// If the cursor is invalid because of trimming or reconnecting, create it again
if (placeToStart.PagePos == -1)
{
return await ReplayFromAsync(outputStream, _owningOutputRecord.LastSeqSentToReceiver + 1);
}
var nextSeqNo = _owningOutputRecord.LastSeqSentToReceiver + 1;
var bufferEnumerator = placeToStart.PageEnumerator;
var posToStart = placeToStart.PagePos;
var relSeqPos = placeToStart.RelSeqPos;
// We are guaranteed to have an enumerator and starting point. Must send output.
AcquireAppendLock(2);
bool needToUnlockAtEnd = true;
do
{
var curBuffer = bufferEnumerator.Current;
var pageLength = curBuffer.curLength;
var morePages = (curBuffer != _bufferQ.Last());
int numReplayableMessagesToSend;
if (posToStart == 0)
{
// We are starting to send contents of the page. Send everything
numReplayableMessagesToSend = (int)curBuffer.TotalReplayableMessages;
}
else
{
// We are in the middle of sending this page. Respect the previously set counter
numReplayableMessagesToSend = (int)curBuffer.UnsentReplayableMessages;
}
int numRPCs = (int)(curBuffer.HighestSeqNo - curBuffer.LowestSeqNo + 1 - relSeqPos);
curBuffer.UnsentReplayableMessages = 0;
ReleaseAppendLock();
Debug.Assert((nextSeqNo == curBuffer.LowestSeqNo + relSeqPos) && (nextSeqNo >= curBuffer.LowestSeqNo) && ((nextSeqNo + numRPCs - 1) <= curBuffer.HighestSeqNo));
ReleaseTrimLock();
// send the buffer
if (pageLength - posToStart > 0)
{
// We really have output to send. Send it.
//!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Uncomment/Comment for testing
//StartupParamOverrides.OutputStream.WriteLine("Wrote from {0} to {1}, {2}", curBuffer.LowestSeqNo, curBuffer.HighestSeqNo, morePages);
int bytesInBatchData = pageLength - posToStart;
if (numRPCs > 1)
{
if (numReplayableMessagesToSend == numRPCs)
{
// writing a batch
outputStream.WriteInt(bytesInBatchData + 1 + StreamCommunicator.IntSize(numRPCs));
outputStream.WriteByte(AmbrosiaRuntime.RPCBatchByte);
outputStream.WriteInt(numRPCs);
#if DEBUG
try
{
curBuffer.CheckSendBytes(posToStart, numRPCs, pageLength - posToStart);
}
catch (Exception e)
{
Trace.TraceError("Error sending partial page, checking page integrity: {0}", e.Message);
curBuffer.CheckPageIntegrity();
throw e;
}
#endif
await outputStream.WriteAsync(curBuffer.PageBytes, posToStart, bytesInBatchData);
await outputStream.FlushAsync();
}
else
{
// writing a mixed batch
outputStream.WriteInt(bytesInBatchData + 1 + StreamCommunicator.IntSize(numRPCs) + StreamCommunicator.IntSize(numReplayableMessagesToSend));
outputStream.WriteByte(AmbrosiaRuntime.CountReplayableRPCBatchByte);
outputStream.WriteInt(numRPCs);
outputStream.WriteInt(numReplayableMessagesToSend);
#if DEBUG
try
{
curBuffer.CheckSendBytes(posToStart, numRPCs, pageLength - posToStart);
}
catch (Exception e)
{
Trace.TraceError("Error sending partial page, checking page integrity: {0}", e.Message);
// StartupParamOverrides.OutputStream.WriteLine("Error sending partial page, checking page integrity: {0}", e.Message);
curBuffer.CheckPageIntegrity();
throw e;
}
#endif
await outputStream.WriteAsync(curBuffer.PageBytes, posToStart, bytesInBatchData);
await outputStream.FlushAsync();
}
}
else
{
// writing individual RPCs
await outputStream.WriteAsync(curBuffer.PageBytes, posToStart, bytesInBatchData);
await outputStream.FlushAsync();
}
}
AcquireTrimLock(2);
_owningOutputRecord.LastSeqSentToReceiver += numRPCs;
Debug.Assert((_owningOutputRecord.placeInOutput != null) && (_owningOutputRecord.placeInOutput.PageEnumerator != null)); // Used to check these, but they should always be true now that there are no recursive SendAsync calls.
var trimResetIterator = _owningOutputRecord.placeInOutput.PagePos == -1;
var trimPushedIterator = !trimResetIterator && (bufferEnumerator.Current != curBuffer);
// Must handle cases where trim came in during the actual send and reset the iterator
if (trimResetIterator)
{
Debug.Assert(!morePages);
// Done outputting. Just return the enumerator replacement
return _owningOutputRecord.placeInOutput;
}
else
{
Debug.Assert((bufferEnumerator.Current != curBuffer) || ((nextSeqNo == curBuffer.LowestSeqNo + relSeqPos) && (nextSeqNo >= curBuffer.LowestSeqNo) && ((nextSeqNo + numRPCs - 1) <= curBuffer.HighestSeqNo)));
nextSeqNo += numRPCs;
if (trimPushedIterator)
{
Debug.Assert(placeToStart.PagePos == 0 && placeToStart.RelSeqPos == 0);
if (morePages)
{
AcquireAppendLock(2);
}
else
{
needToUnlockAtEnd = false;
break;
}
}
else // trim didn't alter the iterator at all
{
if (morePages)
{
placeToStart.PagePos = 0;
placeToStart.RelSeqPos = 0;
AcquireAppendLock(2);
var moveNextResult = bufferEnumerator.MoveNext();
Debug.Assert(moveNextResult);
}
else
{
placeToStart.PagePos = pageLength;
placeToStart.RelSeqPos = relSeqPos + numRPCs;
needToUnlockAtEnd = false;
break;
}
}
}
nextSeqNo = _owningOutputRecord.LastSeqSentToReceiver + 1;
bufferEnumerator = placeToStart.PageEnumerator;
posToStart = placeToStart.PagePos;
relSeqPos = placeToStart.RelSeqPos;
}
while (true);
Debug.Assert(placeToStart.PageEnumerator == bufferEnumerator); // Used to set this rather than compare, but they should never be different. May be different due to reconnection!!!!!!!!!!!!!!! If they are different due to reconnection or something, don't know why we'd want to make them the same
if (needToUnlockAtEnd)
{
Debug.Assert(false); // Is this ever actually hit? If not, we should eventually get rid of needToUnlockAtEnd and this whole if.
ReleaseAppendLock();
}
return placeToStart;
}