public async Task AddRow()

in AmbrosiaLib/Ambrosia/Program.cs [1532:1707]


            public async Task<long> AddRow(FlexReadBuffer copyFromFlexBuffer,
                                           string outputToUpdate,
                                           long newSeqNo,
                                           long newReplayableSeqNo,
                                           ConcurrentDictionary<string, OutputConnectionRecord> outputs,
                                           InputConnectionRecord associatedInputConnectionRecord)
            {
                var copyFromBuffer = copyFromFlexBuffer.Buffer;
                var length = copyFromFlexBuffer.Length;
                while (true)
                {
                    bool sealing = false;
                    long localStatus;
                    localStatus = Interlocked.Read(ref _status);

                    // Yield if the sealed bit is set
                    while (localStatus % 2 == 1)
                    {
                        await Task.Yield();
                        localStatus = Interlocked.Read(ref _status);
                    }
                    var oldBufLength = ((localStatus >> SealedBits) & Last32Mask);
                    var newLength = oldBufLength + length;

                    // Assemble the new status 
                    long newLocalStatus;
                    if ((newLength > _maxBufSize) || (_bufbak != null))
                    {
                        // We're going to try to seal the buffer
                        newLocalStatus = localStatus + 1;
                        sealing = true;
                    }
                    else
                    {
                        // We're going to try to add to the end of the existing buffer
                        var newWrites = (localStatus >> (64 - numWritesBits)) + 1;
                        newLocalStatus = ((newWrites) << (64 - numWritesBits)) | (newLength << SealedBits);
                    }
                    var origVal = Interlocked.CompareExchange(ref _status, newLocalStatus, localStatus);

                    // Check if the compare and swap succeeded, otherwise try again
                    if (origVal == localStatus)
                    {
                        // We are now preventing recovery until addrow finishes and all resulting commits have completed. We can safely update
                        // LastProcessedID and LastProcessedReplayableID
                        associatedInputConnectionRecord.LastProcessedID = newSeqNo;
                        associatedInputConnectionRecord.LastProcessedReplayableID = newReplayableSeqNo;
                        if (sealing)
                        {
                            // This call successfully sealed the buffer. Remember we still have an extra
                            // message to take care of

                            // We have just filled the backup buffer and must wait until any other commit finishes
                            int counter = 0;
                            while (_bufbak == null)
                            {
                                counter++;
                                if (counter == 100000)
                                {
                                    counter = 0;
                                    await Task.Yield();
                                }
                            }

                            // There is no other write going on. Take the backup buffer
                            var newUncommittedWatermarks = _uncommittedWatermarksBak;
                            var newWriteBuf = _bufbak;
                            _bufbak = null;
                            _uncommittedWatermarksBak = null;

                            // Wait for other writes to complete before committing
                            while (true)
                            {
                                localStatus = Interlocked.Read(ref _status);
                                var numWrites = (localStatus >> (64 - numWritesBits));
                                if (numWrites == 0)
                                {
                                    break;
                                }
                                await Task.Yield();
                            }

                            // Filling header with enough info to detect incomplete writes and also writing the page length
                            var writeStream = new MemoryStream(_buf, 4, 20);
                            int lengthOnPage;
                            if (newLength <= _maxBufSize)
                            {
                                lengthOnPage = (int)newLength;
                            }
                            else
                            {
                                lengthOnPage = (int)oldBufLength;
                            }
                            writeStream.WriteIntFixed(lengthOnPage);
                            if (newLength <= _maxBufSize)
                            {
                                // Copy the contents into the log record buffer
                                Buffer.BlockCopy(copyFromBuffer, 0, _buf, (int)oldBufLength, length);
                            }
                            ulong checkBytes;
                            if (length <= (_maxBufSize - HeaderSize))
                            {
                                // new message will end up in a commit buffer. Use normal CheckBytes
                                checkBytes = CheckBytesxxHash64(_buf, HeaderSize, lengthOnPage - HeaderSize);
                            }
                            else
                            {
                                // new message is too big to land in a commit buffer and will be tacked on the end.
                                checkBytes = CheckBytesExtraxxHash64(HeaderSize, lengthOnPage - HeaderSize, copyFromBuffer, length);
                            }
                            writeStream.WriteULongFixed(checkBytes);
                            writeStream.WriteLongFixed(_nextWriteID);
                            _nextWriteID++;

                            // Do the actual commit
                            // Grab the current state of trim levels since the last write
                            // Note that the trim thread may want to modify the table, requiring a lock
                            ConcurrentDictionary<string, long> oldTrimWatermarks;
                            lock (_trimWatermarks)
                            {
                                oldTrimWatermarks = _trimWatermarks;
                                _trimWatermarks = _trimWatermarksBak;
                                _trimWatermarksBak = null;
                            }
                            if (newLength <= _maxBufSize)
                            {
                                // add row to current buffer and commit
                                _uncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
                                _lastCommitTask = Commit(_buf, (int)newLength, _uncommittedWatermarks, oldTrimWatermarks, outputs);
                                newLocalStatus = HeaderSize << SealedBits;
                            }
                            else if (length > (_maxBufSize - HeaderSize))
                            {
                                // Steal the byte array in the flex buffer to return it after writing
                                copyFromFlexBuffer.StealBuffer();
                                // write new event as part of commit
                                _uncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
                                var commitTask = Commit(_buf, (int)oldBufLength, copyFromBuffer, length, _uncommittedWatermarks, oldTrimWatermarks, outputs);
                                newLocalStatus = HeaderSize << SealedBits;
                            }
                            else
                            {
                                // commit and add new event to new buffer
                                newUncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
                                _lastCommitTask = Commit(_buf, (int)oldBufLength, _uncommittedWatermarks, oldTrimWatermarks, outputs);
                                Buffer.BlockCopy(copyFromBuffer, 0, newWriteBuf, (int)HeaderSize, length);
                                newLocalStatus = (HeaderSize + length) << SealedBits;
                            }
                            _buf = newWriteBuf;
                            _uncommittedWatermarks = newUncommittedWatermarks;
                            _status = newLocalStatus;
                            return (long)_logStream.FileSize;
                        }
                        // Add the message to the existing buffer
                        Buffer.BlockCopy(copyFromBuffer, 0, _buf, (int)oldBufLength, length);
                        _uncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
                        // Reduce write count
                        while (true)
                        {
                            localStatus = Interlocked.Read(ref _status);
                            var newWrites = (localStatus >> (64 - numWritesBits)) - 1;
                            newLocalStatus = (localStatus & ((Last32Mask << 1) + 1)) |
                                          (newWrites << (64 - numWritesBits));
                            origVal = Interlocked.CompareExchange(ref _status, newLocalStatus, localStatus);
                            if (origVal == localStatus)
                            {
                                if (localStatus % 2 == 0 && _bufbak != null)
                                {
                                    await TryCommitAsync(outputs);
                                }
                                return (long)_logStream.FileSize;
                            }
                        }
                    }
                }
            }