private CompletableFuture processReadRequest()

in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java [383:426]


  private CompletableFuture<Message> processReadRequest(LogServiceRequestProto proto) {
    ReadLogRequestProto msgProto = proto.getReadNextQuery();
    // Get the recordId the user wants to start reading at
    long startRecordId = msgProto.getStartRecordId();
    // And the number of records they want to read
    int numRecordsToRead = msgProto.getNumRecords();
    //Log must have been closed while Archiving , so we can let user only to
    // read when the log is either OPEN or ARCHIVED
    Throwable t = verifyState(State.OPEN, State.ARCHIVING, State.CLOSED, State.ARCHIVED);
    List<byte[]> list = null;

    if (t == null) {
      RaftLogReader reader = null;
      try {
        if (this.state == State.OPEN || this.state == State.CLOSED
            || this.state == State.ARCHIVING) {
          reader = new LogServiceRaftLogReader(log);
        } else if (this.state == State.ARCHIVED) {
          reader = new ArchiveHdfsLogReader(LogServiceUtils
              .getArchiveLocationForLog(archivalInfo.getArchiveLocation(),
                  archivalInfo.getArchiveLogName()));
        } else {
          //could be a race condition
          t = verifyState(State.OPEN, State.ARCHIVED);
        }
        if (t == null && reader != null) {
          list = new ArrayList<byte[]>();
          reader.seek(startRecordId);
          for (int i = 0; i < numRecordsToRead; i++) {
            if (!reader.hasNext()) {
              break;
            }
            list.add(reader.next());
          }
        }
      } catch (Exception e) {
        LOG.error("Failed to execute ReadNextQuery", e);
        t = e;
        list = null;
      }
    }
    return CompletableFuture.completedFuture(
      Message.valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, t).toByteString()));
  }