internal async Task SendAsync()

in AmbrosiaLib/Ambrosia/Program.cs [480:647]


        internal async Task<BuffersCursor> SendAsync(Stream outputStream,
                                                     BuffersCursor placeToStart)
        {
            // If the cursor is invalid because of trimming or reconnecting, create it again
            if (placeToStart.PagePos == -1)
            {
                return await ReplayFromAsync(outputStream, _owningOutputRecord.LastSeqSentToReceiver + 1);

            }
            var nextSeqNo = _owningOutputRecord.LastSeqSentToReceiver + 1;
            var bufferEnumerator = placeToStart.PageEnumerator;
            var posToStart = placeToStart.PagePos;
            var relSeqPos = placeToStart.RelSeqPos;

            // We are guaranteed to have an enumerator and starting point. Must send output.
            AcquireAppendLock(2);
            bool needToUnlockAtEnd = true;
            do
            {
                var curBuffer = bufferEnumerator.Current;
                var pageLength = curBuffer.curLength;
                var morePages = (curBuffer != _bufferQ.Last());
                int numReplayableMessagesToSend;
                if (posToStart == 0)
                {
                    // We are starting to send contents of the page. Send everything
                    numReplayableMessagesToSend = (int)curBuffer.TotalReplayableMessages;
                }
                else
                {
                    // We are in the middle of sending this page. Respect the previously set counter
                    numReplayableMessagesToSend = (int)curBuffer.UnsentReplayableMessages;
                }
                int numRPCs = (int)(curBuffer.HighestSeqNo - curBuffer.LowestSeqNo + 1 - relSeqPos);
                curBuffer.UnsentReplayableMessages = 0;
                ReleaseAppendLock();
                Debug.Assert((nextSeqNo == curBuffer.LowestSeqNo + relSeqPos) && (nextSeqNo >= curBuffer.LowestSeqNo) && ((nextSeqNo + numRPCs - 1) <= curBuffer.HighestSeqNo));
                ReleaseTrimLock();
                // send the buffer
                if (pageLength - posToStart > 0)
                {
                    // We really have output to send. Send it.
                    //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Uncomment/Comment for testing
                    //StartupParamOverrides.OutputStream.WriteLine("Wrote from {0} to {1}, {2}", curBuffer.LowestSeqNo, curBuffer.HighestSeqNo, morePages);
                    int bytesInBatchData = pageLength - posToStart;
                    if (numRPCs > 1)
                    {
                        if (numReplayableMessagesToSend == numRPCs)
                        {
                            // writing a batch
                            outputStream.WriteInt(bytesInBatchData + 1 + StreamCommunicator.IntSize(numRPCs));
                            outputStream.WriteByte(AmbrosiaRuntime.RPCBatchByte);
                            outputStream.WriteInt(numRPCs);
#if DEBUG
                            try
                            {
                                curBuffer.CheckSendBytes(posToStart, numRPCs, pageLength - posToStart);
                            }
                            catch (Exception e)
                            {
                                Trace.TraceError("Error sending partial page, checking page integrity: {0}", e.Message);
                                curBuffer.CheckPageIntegrity();
                                throw e;
                            }
#endif
                            await outputStream.WriteAsync(curBuffer.PageBytes, posToStart, bytesInBatchData);
                            await outputStream.FlushAsync();
                        }
                        else
                        {
                            // writing a mixed batch
                            outputStream.WriteInt(bytesInBatchData + 1 + StreamCommunicator.IntSize(numRPCs) + StreamCommunicator.IntSize(numReplayableMessagesToSend));
                            outputStream.WriteByte(AmbrosiaRuntime.CountReplayableRPCBatchByte);
                            outputStream.WriteInt(numRPCs);
                            outputStream.WriteInt(numReplayableMessagesToSend);
#if DEBUG
                            try
                            {
                                curBuffer.CheckSendBytes(posToStart, numRPCs, pageLength - posToStart);
                            }
                            catch (Exception e)
                            {
                                Trace.TraceError("Error sending partial page, checking page integrity: {0}", e.Message);
//                                StartupParamOverrides.OutputStream.WriteLine("Error sending partial page, checking page integrity: {0}", e.Message);
                                curBuffer.CheckPageIntegrity();
                                throw e;
                            }
#endif
                            await outputStream.WriteAsync(curBuffer.PageBytes, posToStart, bytesInBatchData);
                            await outputStream.FlushAsync();
                        }
                    }
                    else
                    {
                        // writing individual RPCs
                        await outputStream.WriteAsync(curBuffer.PageBytes, posToStart, bytesInBatchData);
                        await outputStream.FlushAsync();
                    }
                }
                AcquireTrimLock(2);
                _owningOutputRecord.LastSeqSentToReceiver += numRPCs;

                Debug.Assert((_owningOutputRecord.placeInOutput != null) && (_owningOutputRecord.placeInOutput.PageEnumerator != null)); // Used to check these, but they should always be true now that there are no recursive SendAsync calls.

                var trimResetIterator = _owningOutputRecord.placeInOutput.PagePos == -1;

                var trimPushedIterator = !trimResetIterator && (bufferEnumerator.Current != curBuffer);

                // Must handle cases where trim came in during the actual send and reset the iterator
                if (trimResetIterator)
                {
                    Debug.Assert(!morePages);
                    // Done outputting. Just return the enumerator replacement
                    return _owningOutputRecord.placeInOutput;
                }
                else
                {
                    Debug.Assert((bufferEnumerator.Current != curBuffer) || ((nextSeqNo == curBuffer.LowestSeqNo + relSeqPos) && (nextSeqNo >= curBuffer.LowestSeqNo) && ((nextSeqNo + numRPCs - 1) <= curBuffer.HighestSeqNo)));
                    nextSeqNo += numRPCs;

                    if (trimPushedIterator)
                    {
                        Debug.Assert(placeToStart.PagePos == 0 && placeToStart.RelSeqPos == 0);

                        if (morePages)
                        {
                            AcquireAppendLock(2);
                        }
                        else
                        {
                            needToUnlockAtEnd = false;
                            break;
                        }
                    }
                    else // trim didn't alter the iterator at all
                    {
                        if (morePages)
                        {
                            placeToStart.PagePos = 0;
                            placeToStart.RelSeqPos = 0;
                            AcquireAppendLock(2);
                            var moveNextResult = bufferEnumerator.MoveNext();
                            Debug.Assert(moveNextResult);
                        }
                        else
                        {
                            placeToStart.PagePos = pageLength;
                            placeToStart.RelSeqPos = relSeqPos + numRPCs;
                            needToUnlockAtEnd = false;
                            break;
                        }
                    }
                }

                nextSeqNo = _owningOutputRecord.LastSeqSentToReceiver + 1;
                bufferEnumerator = placeToStart.PageEnumerator;
                posToStart = placeToStart.PagePos;
                relSeqPos = placeToStart.RelSeqPos;
            }
            while (true);
            Debug.Assert(placeToStart.PageEnumerator == bufferEnumerator); // Used to set this rather than compare, but they should never be different. May be different due to reconnection!!!!!!!!!!!!!!! If they are different due to reconnection or something, don't know why we'd want to make them the same
            if (needToUnlockAtEnd)
            {
                Debug.Assert(false); // Is this ever actually hit? If not, we should eventually get rid of needToUnlockAtEnd and this whole if.
                ReleaseAppendLock();
            }
            return placeToStart;
        }