in ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java [420:478]
protected CompletableFuture<Long> appendEntryImpl(ReferenceCountedObject<LogEntryProto> entryRef,
TransactionContext context) {
checkLogState();
LogEntryProto entry = entryRef.retain();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry));
}
final LogEntryProto removedStateMachineData = LogProtoUtils.removeStateMachineData(entry);
try(AutoCloseableLock writeLock = writeLock()) {
final Timekeeper.Context appendEntryTimerContext = getRaftLogMetrics().startAppendEntryTimer();
validateLogEntry(entry);
final LogSegment currentOpenSegment = cache.getOpenSegment();
boolean rollOpenSegment = false;
if (currentOpenSegment == null) {
cache.addOpenSegment(entry.getIndex());
fileLogWorker.startLogSegment(entry.getIndex());
} else if (isSegmentFull(currentOpenSegment, removedStateMachineData)) {
rollOpenSegment = true;
} else {
final TermIndex last = currentOpenSegment.getLastTermIndex();
if (last != null && last.getTerm() != entry.getTerm()) {
// the term changes
Preconditions.assertTrue(last.getTerm() < entry.getTerm(),
"open segment's term %s is larger than the new entry's term %s",
last.getTerm(), entry.getTerm());
rollOpenSegment = true;
}
}
if (rollOpenSegment) {
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
cacheEviction.signal();
}
// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
final Task write = fileLogWorker.writeLogEntry(entryRef, removedStateMachineData, context);
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
if (removedStateMachineData != entry) {
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
ReferenceCountedObject.wrap(removedStateMachineData));
} else {
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
ReferenceCountedObject.wrap(LogProtoUtils.copy(removedStateMachineData)));
}
} else {
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, entryRef);
}
return write.getFuture().whenComplete((clientReply, exception) -> appendEntryTimerContext.stop());
} catch (Exception e) {
LOG.error("{}: Failed to append {}", getName(), toLogEntryString(entry), e);
throw e;
} finally {
entryRef.release();
}
}