in AmbrosiaLib/Ambrosia/Program.cs [1532:1707]
public async Task<long> AddRow(FlexReadBuffer copyFromFlexBuffer,
string outputToUpdate,
long newSeqNo,
long newReplayableSeqNo,
ConcurrentDictionary<string, OutputConnectionRecord> outputs,
InputConnectionRecord associatedInputConnectionRecord)
{
var copyFromBuffer = copyFromFlexBuffer.Buffer;
var length = copyFromFlexBuffer.Length;
while (true)
{
bool sealing = false;
long localStatus;
localStatus = Interlocked.Read(ref _status);
// Yield if the sealed bit is set
while (localStatus % 2 == 1)
{
await Task.Yield();
localStatus = Interlocked.Read(ref _status);
}
var oldBufLength = ((localStatus >> SealedBits) & Last32Mask);
var newLength = oldBufLength + length;
// Assemble the new status
long newLocalStatus;
if ((newLength > _maxBufSize) || (_bufbak != null))
{
// We're going to try to seal the buffer
newLocalStatus = localStatus + 1;
sealing = true;
}
else
{
// We're going to try to add to the end of the existing buffer
var newWrites = (localStatus >> (64 - numWritesBits)) + 1;
newLocalStatus = ((newWrites) << (64 - numWritesBits)) | (newLength << SealedBits);
}
var origVal = Interlocked.CompareExchange(ref _status, newLocalStatus, localStatus);
// Check if the compare and swap succeeded, otherwise try again
if (origVal == localStatus)
{
// We are now preventing recovery until addrow finishes and all resulting commits have completed. We can safely update
// LastProcessedID and LastProcessedReplayableID
associatedInputConnectionRecord.LastProcessedID = newSeqNo;
associatedInputConnectionRecord.LastProcessedReplayableID = newReplayableSeqNo;
if (sealing)
{
// This call successfully sealed the buffer. Remember we still have an extra
// message to take care of
// We have just filled the backup buffer and must wait until any other commit finishes
int counter = 0;
while (_bufbak == null)
{
counter++;
if (counter == 100000)
{
counter = 0;
await Task.Yield();
}
}
// There is no other write going on. Take the backup buffer
var newUncommittedWatermarks = _uncommittedWatermarksBak;
var newWriteBuf = _bufbak;
_bufbak = null;
_uncommittedWatermarksBak = null;
// Wait for other writes to complete before committing
while (true)
{
localStatus = Interlocked.Read(ref _status);
var numWrites = (localStatus >> (64 - numWritesBits));
if (numWrites == 0)
{
break;
}
await Task.Yield();
}
// Filling header with enough info to detect incomplete writes and also writing the page length
var writeStream = new MemoryStream(_buf, 4, 20);
int lengthOnPage;
if (newLength <= _maxBufSize)
{
lengthOnPage = (int)newLength;
}
else
{
lengthOnPage = (int)oldBufLength;
}
writeStream.WriteIntFixed(lengthOnPage);
if (newLength <= _maxBufSize)
{
// Copy the contents into the log record buffer
Buffer.BlockCopy(copyFromBuffer, 0, _buf, (int)oldBufLength, length);
}
ulong checkBytes;
if (length <= (_maxBufSize - HeaderSize))
{
// new message will end up in a commit buffer. Use normal CheckBytes
checkBytes = CheckBytesxxHash64(_buf, HeaderSize, lengthOnPage - HeaderSize);
}
else
{
// new message is too big to land in a commit buffer and will be tacked on the end.
checkBytes = CheckBytesExtraxxHash64(HeaderSize, lengthOnPage - HeaderSize, copyFromBuffer, length);
}
writeStream.WriteULongFixed(checkBytes);
writeStream.WriteLongFixed(_nextWriteID);
_nextWriteID++;
// Do the actual commit
// Grab the current state of trim levels since the last write
// Note that the trim thread may want to modify the table, requiring a lock
ConcurrentDictionary<string, long> oldTrimWatermarks;
lock (_trimWatermarks)
{
oldTrimWatermarks = _trimWatermarks;
_trimWatermarks = _trimWatermarksBak;
_trimWatermarksBak = null;
}
if (newLength <= _maxBufSize)
{
// add row to current buffer and commit
_uncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
_lastCommitTask = Commit(_buf, (int)newLength, _uncommittedWatermarks, oldTrimWatermarks, outputs);
newLocalStatus = HeaderSize << SealedBits;
}
else if (length > (_maxBufSize - HeaderSize))
{
// Steal the byte array in the flex buffer to return it after writing
copyFromFlexBuffer.StealBuffer();
// write new event as part of commit
_uncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
var commitTask = Commit(_buf, (int)oldBufLength, copyFromBuffer, length, _uncommittedWatermarks, oldTrimWatermarks, outputs);
newLocalStatus = HeaderSize << SealedBits;
}
else
{
// commit and add new event to new buffer
newUncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
_lastCommitTask = Commit(_buf, (int)oldBufLength, _uncommittedWatermarks, oldTrimWatermarks, outputs);
Buffer.BlockCopy(copyFromBuffer, 0, newWriteBuf, (int)HeaderSize, length);
newLocalStatus = (HeaderSize + length) << SealedBits;
}
_buf = newWriteBuf;
_uncommittedWatermarks = newUncommittedWatermarks;
_status = newLocalStatus;
return (long)_logStream.FileSize;
}
// Add the message to the existing buffer
Buffer.BlockCopy(copyFromBuffer, 0, _buf, (int)oldBufLength, length);
_uncommittedWatermarks[outputToUpdate] = new LongPair(newSeqNo, newReplayableSeqNo);
// Reduce write count
while (true)
{
localStatus = Interlocked.Read(ref _status);
var newWrites = (localStatus >> (64 - numWritesBits)) - 1;
newLocalStatus = (localStatus & ((Last32Mask << 1) + 1)) |
(newWrites << (64 - numWritesBits));
origVal = Interlocked.CompareExchange(ref _status, newLocalStatus, localStatus);
if (origVal == localStatus)
{
if (localStatus % 2 == 0 && _bufbak != null)
{
await TryCommitAsync(outputs);
}
return (long)_logStream.FileSize;
}
}
}
}
}