protected CompletableFuture appendEntryImpl()

in ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java [420:478]


  protected CompletableFuture<Long> appendEntryImpl(ReferenceCountedObject<LogEntryProto> entryRef,
      TransactionContext context) {
    checkLogState();
    LogEntryProto entry = entryRef.retain();
    if (LOG.isTraceEnabled()) {
      LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry));
    }
    final LogEntryProto removedStateMachineData = LogProtoUtils.removeStateMachineData(entry);
    try(AutoCloseableLock writeLock = writeLock()) {
      final Timekeeper.Context appendEntryTimerContext = getRaftLogMetrics().startAppendEntryTimer();
      validateLogEntry(entry);
      final LogSegment currentOpenSegment = cache.getOpenSegment();
      boolean rollOpenSegment = false;
      if (currentOpenSegment == null) {
        cache.addOpenSegment(entry.getIndex());
        fileLogWorker.startLogSegment(entry.getIndex());
      } else if (isSegmentFull(currentOpenSegment, removedStateMachineData)) {
        rollOpenSegment = true;
      } else {
        final TermIndex last = currentOpenSegment.getLastTermIndex();
        if (last != null && last.getTerm() != entry.getTerm()) {
          // the term changes
          Preconditions.assertTrue(last.getTerm() < entry.getTerm(),
              "open segment's term %s is larger than the new entry's term %s",
              last.getTerm(), entry.getTerm());
          rollOpenSegment = true;
        }
      }

      if (rollOpenSegment) {
        cache.rollOpenSegment(true);
        fileLogWorker.rollLogSegment(currentOpenSegment);
        cacheEviction.signal();
      }

      // If the entry has state machine data, then the entry should be inserted
      // to statemachine first and then to the cache. Not following the order
      // will leave a spurious entry in the cache.
      final Task write = fileLogWorker.writeLogEntry(entryRef, removedStateMachineData, context);
      if (stateMachineCachingEnabled) {
        // The stateMachineData will be cached inside the StateMachine itself.
        if (removedStateMachineData != entry) {
          cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
              ReferenceCountedObject.wrap(removedStateMachineData));
        } else {
          cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
              ReferenceCountedObject.wrap(LogProtoUtils.copy(removedStateMachineData)));
        }
      } else {
        cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, entryRef);
      }
      return write.getFuture().whenComplete((clientReply, exception) -> appendEntryTimerContext.stop());
    } catch (Exception e) {
      LOG.error("{}: Failed to append {}", getName(), toLogEntryString(entry), e);
      throw e;
    } finally {
      entryRef.release();
    }
  }