in core/src/main/java/com/jetbrains/youtrackdb/internal/core/storage/impl/local/paginated/wal/cas/CASDiskWriteAheadLog.java [1702:1967]
public void executeWriteRecords(boolean forceSync, boolean fullWrite) {
recordsWriterLock.lock();
try {
if (cancelRecordsWriting) {
return;
}
if (printPerformanceStatistic) {
printReport();
}
final var ts = System.nanoTime();
final var makeFSync = forceSync || ts - lastFSyncTs > fsyncInterval * 1_000_000L;
final var qSize = queueSize.get();
// even if queue is empty we need to write buffer content to the disk if needed
if (qSize > 0 || fullWrite || makeFSync) {
final var fl = new CountDownLatch(1);
flushLatch.lazySet(fl);
try {
// in case of "full write" mode, we log milestone record and iterate over the queue till
// we find it
final MilestoneWALRecord milestoneRecord;
// in case of "full cache" mode we chose last record in the queue, iterate till this
// record and write it if needed
// but do not remove this record from the queue, so we will always have queue with
// record with valid LSN
// if we write last record, we mark it as written, so we do not repeat that again
final WALRecord lastRecord;
// we jump to new page if we need to make fsync or we need to be sure that records are
// written in file system
if (makeFSync || fullWrite) {
segmentLock.sharedLock();
try {
milestoneRecord = logMilestoneRecord();
} finally {
segmentLock.sharedUnlock();
}
lastRecord = null;
} else {
final var cursor = records.peekLast();
assert cursor != null;
lastRecord = cursor.getItem();
assert lastRecord != null;
if (lastRecord.getLsn().getPosition() == -1) {
calculateRecordsLSNs();
}
assert lastRecord.getLsn().getPosition() >= 0;
milestoneRecord = null;
}
while (true) {
final var record = records.peek();
if (record == milestoneRecord) {
break;
}
assert record != null;
final var lsn = record.getLsn();
assert lsn.getSegment() >= segmentId;
if (!(record instanceof MilestoneWALRecord) && !(record instanceof StartWALRecord)) {
if (segmentId != lsn.getSegment()) {
if (walFile != null) {
if (writeBufferPointer != null) {
writeBuffer(walFile, segmentId, writeBuffer, lastLSN);
}
writeBufferPointer = null;
writeBuffer = null;
writeBufferPageIndex = -1;
lastLSN = null;
try {
if (writeFuture != null) {
writeFuture.get();
}
} catch (final InterruptedException e) {
LogManager.instance().error(this, "WAL write was interrupted", e);
}
assert walFile.position() == currentPosition;
fileCloseQueueSize.incrementAndGet();
fileCloseQueue.offer(new RawPairLongObject<>(segmentId, walFile));
}
segmentId = lsn.getSegment();
walFile =
WALFile.createWriteWALFile(
walLocation.resolve(getSegmentName(segmentId)), segmentId);
assert lsn.getPosition() == CASWALPage.RECORDS_OFFSET;
currentPosition = 0;
}
final var writeableRecord = (WriteableWALRecord) record;
if (!writeableRecord.isWritten()) {
var written = 0;
final var recordContentBinarySize = writeableRecord.getBinaryContentLen();
final var bytesToWrite = IntegerSerializer.INT_SIZE + recordContentBinarySize;
final var recordContent = writeableRecord.getBinaryContent();
recordContent.position(0);
byte[] recordSize = null;
var recordSizeWritten = -1;
var recordSizeIsWritten = false;
while (written < bytesToWrite) {
if (writeBuffer == null || writeBuffer.remaining() == 0) {
if (writeBufferPointer != null) {
assert writeBuffer != null;
writeBuffer(walFile, segmentId, writeBuffer, lastLSN);
}
if (useFirstBuffer) {
writeBufferPointer = writeBufferPointerOne;
writeBuffer = writeBufferOne;
} else {
writeBufferPointer = writeBufferPointerTwo;
writeBuffer = writeBufferTwo;
}
writeBuffer.limit(writeBuffer.capacity());
writeBuffer.rewind();
useFirstBuffer = !useFirstBuffer;
writeBufferPageIndex = -1;
lastLSN = null;
}
if (writeBuffer.position() % pageSize == 0) {
writeBufferPageIndex++;
writeBuffer.position(writeBuffer.position() + CASWALPage.RECORDS_OFFSET);
}
assert written != 0
|| currentPosition + writeBuffer.position() == lsn.getPosition()
: (currentPosition + writeBuffer.position()) + " vs " + lsn.getPosition();
final var chunkSize =
Math.min(
bytesToWrite - written,
(writeBufferPageIndex + 1) * pageSize - writeBuffer.position());
assert chunkSize <= maxRecordSize;
assert chunkSize + writeBuffer.position()
<= (writeBufferPageIndex + 1) * pageSize;
assert writeBuffer.position() > writeBufferPageIndex * pageSize;
if (!recordSizeIsWritten) {
if (recordSizeWritten > 0) {
writeBuffer.put(
recordSize,
recordSizeWritten,
IntegerSerializer.INT_SIZE - recordSizeWritten);
written += IntegerSerializer.INT_SIZE - recordSizeWritten;
recordSize = null;
recordSizeWritten = -1;
recordSizeIsWritten = true;
continue;
} else if (IntegerSerializer.INT_SIZE <= chunkSize) {
writeBuffer.putInt(recordContentBinarySize);
written += IntegerSerializer.INT_SIZE;
recordSize = null;
recordSizeWritten = -1;
recordSizeIsWritten = true;
continue;
} else {
recordSize = new byte[IntegerSerializer.INT_SIZE];
IntegerSerializer.serializeNative(
recordContentBinarySize, recordSize, 0);
recordSizeWritten =
(writeBufferPageIndex + 1) * pageSize - writeBuffer.position();
written += recordSizeWritten;
writeBuffer.put(recordSize, 0, recordSizeWritten);
continue;
}
}
recordContent.limit(recordContent.position() + chunkSize);
writeBuffer.put(recordContent);
written += chunkSize;
}
lastLSN = lsn;
queueSize.addAndGet(-writeableRecord.getDiskSize());
writeableRecord.written();
writeableRecord.freeBinaryContent();
}
}
if (lastRecord != record) {
records.poll();
} else {
break;
}
}
if ((makeFSync || fullWrite) && writeBufferPointer != null) {
writeBuffer(walFile, segmentId, writeBuffer, lastLSN);
writeBufferPointer = null;
writeBuffer = null;
writeBufferPageIndex = -1;
lastLSN = null;
}
} finally {
fl.countDown();
}
if (qSize > 0 && ts - segmentAdditionTs >= segmentsInterval) {
appendSegment(currentSegment);
}
}
if (makeFSync) {
try {
try {
if (writeFuture != null) {
writeFuture.get();
}
} catch (final InterruptedException e) {
LogManager.instance().error(this, "WAL write was interrupted", e);
}
assert walFile == null || walFile.position() == currentPosition;
writeFuture =
writeExecutor.submit(
(Callable<?>)
() -> {
executeSyncAndCloseFile();
//noinspection ReturnOfNull
return null;
});
} finally {
lastFSyncTs = ts;
}
}
} catch (final IOException | ExecutionException e) {
LogManager.instance().error(this, "Error during WAL writing", e);
throw new IllegalStateException(e);
} catch (final RuntimeException | Error e) {
LogManager.instance().error(this, "Error during WAL writing", e);
throw e;
} finally {
recordsWriterLock.unlock();
}
}