public void executeWriteRecords()

in core/src/main/java/com/jetbrains/youtrackdb/internal/core/storage/impl/local/paginated/wal/cas/CASDiskWriteAheadLog.java [1702:1967]


  public void executeWriteRecords(boolean forceSync, boolean fullWrite) {
    recordsWriterLock.lock();
    try {
      if (cancelRecordsWriting) {
        return;
      }

      if (printPerformanceStatistic) {
        printReport();
      }

      final var ts = System.nanoTime();
      final var makeFSync = forceSync || ts - lastFSyncTs > fsyncInterval * 1_000_000L;
      final var qSize = queueSize.get();

      // even if queue is empty we need to write buffer content to the disk if needed
      if (qSize > 0 || fullWrite || makeFSync) {
        final var fl = new CountDownLatch(1);
        flushLatch.lazySet(fl);
        try {
          // in case of "full write" mode, we log milestone record and iterate over the queue till
          // we find it
          final MilestoneWALRecord milestoneRecord;
          // in case of "full cache" mode we chose last record in the queue, iterate till this
          // record and write it if needed
          // but do not remove this record from the queue, so we will always have queue with
          // record with valid LSN
          // if we write last record, we mark it as written, so we do not repeat that again
          final WALRecord lastRecord;

          // we jump to new page if we need to make fsync or we need to be sure that records are
          // written in file system
          if (makeFSync || fullWrite) {
            segmentLock.sharedLock();
            try {
              milestoneRecord = logMilestoneRecord();
            } finally {
              segmentLock.sharedUnlock();
            }

            lastRecord = null;
          } else {

            final var cursor = records.peekLast();
            assert cursor != null;

            lastRecord = cursor.getItem();
            assert lastRecord != null;

            if (lastRecord.getLsn().getPosition() == -1) {
              calculateRecordsLSNs();
            }

            assert lastRecord.getLsn().getPosition() >= 0;
            milestoneRecord = null;
          }

          while (true) {
            final var record = records.peek();

            if (record == milestoneRecord) {
              break;
            }

            assert record != null;
            final var lsn = record.getLsn();

            assert lsn.getSegment() >= segmentId;

            if (!(record instanceof MilestoneWALRecord) && !(record instanceof StartWALRecord)) {
              if (segmentId != lsn.getSegment()) {
                if (walFile != null) {
                  if (writeBufferPointer != null) {
                    writeBuffer(walFile, segmentId, writeBuffer, lastLSN);
                  }

                  writeBufferPointer = null;
                  writeBuffer = null;
                  writeBufferPageIndex = -1;

                  lastLSN = null;

                  try {
                    if (writeFuture != null) {
                      writeFuture.get();
                    }
                  } catch (final InterruptedException e) {
                    LogManager.instance().error(this, "WAL write was interrupted", e);
                  }

                  assert walFile.position() == currentPosition;

                  fileCloseQueueSize.incrementAndGet();
                  fileCloseQueue.offer(new RawPairLongObject<>(segmentId, walFile));
                }

                segmentId = lsn.getSegment();

                walFile =
                    WALFile.createWriteWALFile(
                        walLocation.resolve(getSegmentName(segmentId)), segmentId);
                assert lsn.getPosition() == CASWALPage.RECORDS_OFFSET;
                currentPosition = 0;
              }

              final var writeableRecord = (WriteableWALRecord) record;

              if (!writeableRecord.isWritten()) {
                var written = 0;
                final var recordContentBinarySize = writeableRecord.getBinaryContentLen();
                final var bytesToWrite = IntegerSerializer.INT_SIZE + recordContentBinarySize;

                final var recordContent = writeableRecord.getBinaryContent();
                recordContent.position(0);

                byte[] recordSize = null;
                var recordSizeWritten = -1;

                var recordSizeIsWritten = false;

                while (written < bytesToWrite) {
                  if (writeBuffer == null || writeBuffer.remaining() == 0) {
                    if (writeBufferPointer != null) {
                      assert writeBuffer != null;
                      writeBuffer(walFile, segmentId, writeBuffer, lastLSN);
                    }

                    if (useFirstBuffer) {
                      writeBufferPointer = writeBufferPointerOne;
                      writeBuffer = writeBufferOne;
                    } else {
                      writeBufferPointer = writeBufferPointerTwo;
                      writeBuffer = writeBufferTwo;
                    }

                    writeBuffer.limit(writeBuffer.capacity());
                    writeBuffer.rewind();
                    useFirstBuffer = !useFirstBuffer;

                    writeBufferPageIndex = -1;

                    lastLSN = null;
                  }

                  if (writeBuffer.position() % pageSize == 0) {
                    writeBufferPageIndex++;
                    writeBuffer.position(writeBuffer.position() + CASWALPage.RECORDS_OFFSET);
                  }

                  assert written != 0
                      || currentPosition + writeBuffer.position() == lsn.getPosition()
                      : (currentPosition + writeBuffer.position()) + " vs " + lsn.getPosition();
                  final var chunkSize =
                      Math.min(
                          bytesToWrite - written,
                          (writeBufferPageIndex + 1) * pageSize - writeBuffer.position());
                  assert chunkSize <= maxRecordSize;
                  assert chunkSize + writeBuffer.position()
                      <= (writeBufferPageIndex + 1) * pageSize;
                  assert writeBuffer.position() > writeBufferPageIndex * pageSize;

                  if (!recordSizeIsWritten) {
                    if (recordSizeWritten > 0) {
                      writeBuffer.put(
                          recordSize,
                          recordSizeWritten,
                          IntegerSerializer.INT_SIZE - recordSizeWritten);
                      written += IntegerSerializer.INT_SIZE - recordSizeWritten;

                      recordSize = null;
                      recordSizeWritten = -1;
                      recordSizeIsWritten = true;
                      continue;
                    } else if (IntegerSerializer.INT_SIZE <= chunkSize) {
                      writeBuffer.putInt(recordContentBinarySize);
                      written += IntegerSerializer.INT_SIZE;

                      recordSize = null;
                      recordSizeWritten = -1;
                      recordSizeIsWritten = true;
                      continue;
                    } else {
                      recordSize = new byte[IntegerSerializer.INT_SIZE];
                      IntegerSerializer.serializeNative(
                          recordContentBinarySize, recordSize, 0);

                      recordSizeWritten =
                          (writeBufferPageIndex + 1) * pageSize - writeBuffer.position();
                      written += recordSizeWritten;

                      writeBuffer.put(recordSize, 0, recordSizeWritten);
                      continue;
                    }
                  }

                  recordContent.limit(recordContent.position() + chunkSize);
                  writeBuffer.put(recordContent);
                  written += chunkSize;
                }

                lastLSN = lsn;

                queueSize.addAndGet(-writeableRecord.getDiskSize());
                writeableRecord.written();
                writeableRecord.freeBinaryContent();
              }
            }
            if (lastRecord != record) {
              records.poll();
            } else {
              break;
            }
          }

          if ((makeFSync || fullWrite) && writeBufferPointer != null) {
            writeBuffer(walFile, segmentId, writeBuffer, lastLSN);

            writeBufferPointer = null;
            writeBuffer = null;
            writeBufferPageIndex = -1;

            lastLSN = null;
          }
        } finally {
          fl.countDown();
        }

        if (qSize > 0 && ts - segmentAdditionTs >= segmentsInterval) {
          appendSegment(currentSegment);
        }
      }

      if (makeFSync) {
        try {
          try {
            if (writeFuture != null) {
              writeFuture.get();
            }
          } catch (final InterruptedException e) {
            LogManager.instance().error(this, "WAL write was interrupted", e);
          }

          assert walFile == null || walFile.position() == currentPosition;

          writeFuture =
              writeExecutor.submit(
                  (Callable<?>)
                      () -> {
                        executeSyncAndCloseFile();
                        //noinspection ReturnOfNull
                        return null;
                      });
        } finally {
          lastFSyncTs = ts;
        }
      }
    } catch (final IOException | ExecutionException e) {
      LogManager.instance().error(this, "Error during WAL writing", e);
      throw new IllegalStateException(e);
    } catch (final RuntimeException | Error e) {
      LogManager.instance().error(this, "Error during WAL writing", e);
      throw e;
    } finally {
      recordsWriterLock.unlock();
    }
  }