in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java [383:426]
private CompletableFuture<Message> processReadRequest(LogServiceRequestProto proto) {
ReadLogRequestProto msgProto = proto.getReadNextQuery();
// Get the recordId the user wants to start reading at
long startRecordId = msgProto.getStartRecordId();
// And the number of records they want to read
int numRecordsToRead = msgProto.getNumRecords();
//Log must have been closed while Archiving , so we can let user only to
// read when the log is either OPEN or ARCHIVED
Throwable t = verifyState(State.OPEN, State.ARCHIVING, State.CLOSED, State.ARCHIVED);
List<byte[]> list = null;
if (t == null) {
RaftLogReader reader = null;
try {
if (this.state == State.OPEN || this.state == State.CLOSED
|| this.state == State.ARCHIVING) {
reader = new LogServiceRaftLogReader(log);
} else if (this.state == State.ARCHIVED) {
reader = new ArchiveHdfsLogReader(LogServiceUtils
.getArchiveLocationForLog(archivalInfo.getArchiveLocation(),
archivalInfo.getArchiveLogName()));
} else {
//could be a race condition
t = verifyState(State.OPEN, State.ARCHIVED);
}
if (t == null && reader != null) {
list = new ArrayList<byte[]>();
reader.seek(startRecordId);
for (int i = 0; i < numRecordsToRead; i++) {
if (!reader.hasNext()) {
break;
}
list.add(reader.next());
}
}
} catch (Exception e) {
LOG.error("Failed to execute ReadNextQuery", e);
t = e;
list = null;
}
}
return CompletableFuture.completedFuture(
Message.valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, t).toByteString()));
}