in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java [601:697]
private CompletableFuture<Message> processArchiveLog(
LogServiceRequestProto logServiceRequestProto) {
LogServiceProtos.ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog();
LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
Throwable t = null;
try {
String loc = null;
this.isArchivalRequest = !archiveLog.getIsExport();
if (isArchivalRequest) {
loc = archivalInfo.getArchiveLocation();
archivalInfo.updateArchivalInfo(archiveLog);
} else {
loc = archiveLog.getLocation();
ArchivalInfo exportInfo =
exportMap.putIfAbsent(loc, new ArchivalInfo(loc));
if (exportInfo != null) {
if (exportInfo.getLastArchivedIndex() == archiveLog
.getLastArchivedRaftIndex()) {
throw new IllegalStateException("Export of " + logName + "for the given location " + loc
+ "is already present and in " + exportInfo.getStatus());
} else {
exportInfo.updateArchivalInfo(archiveLog);
}
}
}
if (loc == null) {
throw new IllegalArgumentException(isArchivalRequest ?
"Location for archive is not configured" :
"Location for export provided is null");
}
final String location = loc;
long recordId = archiveLog.getLastArchivedRaftIndex();
if (isArchivalRequest) {
t = verifyState(State.CLOSED);
} else {
t = verifyState(State.OPEN, State.CLOSED);
}
if (t == null) {
Callable<Boolean> callable = () -> {
final Timer.Context timerContext = archiveLogTimer.time();
try {
startArchival(recordId, logName, location);
//Init ArchiveLogWriter for writing in export/archival location
ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
writer.init(location, logName);
LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
reader.seek(recordId);
long records = 0;
boolean isInterrupted = false;
while (reader.hasNext()) {
writer.write(ByteBuffer.wrap(reader.next()));
isInterrupted = Thread.currentThread().isInterrupted();
if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE || isInterrupted) {
//roll writer when interuppted or no. of records threshold per file is met
commit(writer, logName, location);
if (isInterrupted) {
break;
}
records = 0;
}
records++;
}
writer.close();
if (!isInterrupted) {
//It means archival is successfully completed on this leader
completeArchival(writer.getLastWrittenRecordId(), logName, location);
} else {
//Thread is interuppted either leader is going down or it become follower
try {
//Sleeping here before sending archival request to new leader to
// avoid causing problem during leader election storm
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName, location);
}
return true;
} catch (Exception e) {
LOG.error("Archival failed for the log:" + logName, e);
failArchival(recordId, logName, location);
} finally {
timerContext.stop();
}
return false;
};
archiveExportFutures.put(location, executorService.submit(callable));
}
}catch (Exception e){
LOG.warn("Exception while processing archival request for " + logName, e);
t = e;
}
return CompletableFuture.completedFuture(
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
}