private CompletableFuture processArchiveLog()

in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java [601:697]


  private CompletableFuture<Message> processArchiveLog(
      LogServiceRequestProto logServiceRequestProto) {
    LogServiceProtos.ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog();
    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
    Throwable t = null;
    try {
      String loc = null;
      this.isArchivalRequest = !archiveLog.getIsExport();
      if (isArchivalRequest) {
        loc = archivalInfo.getArchiveLocation();
        archivalInfo.updateArchivalInfo(archiveLog);
      } else {
        loc = archiveLog.getLocation();
        ArchivalInfo exportInfo =
            exportMap.putIfAbsent(loc, new ArchivalInfo(loc));
        if (exportInfo != null) {
          if (exportInfo.getLastArchivedIndex() == archiveLog
              .getLastArchivedRaftIndex()) {
            throw new IllegalStateException("Export of " + logName + "for the given location " + loc
                + "is already present and in " + exportInfo.getStatus());
          } else {
            exportInfo.updateArchivalInfo(archiveLog);
          }
        }
      }
      if (loc == null) {
        throw new IllegalArgumentException(isArchivalRequest ?
            "Location for archive is not configured" :
            "Location for export provided is null");
      }
      final String location = loc;
      long recordId = archiveLog.getLastArchivedRaftIndex();
      if (isArchivalRequest) {
        t = verifyState(State.CLOSED);
      } else {
        t = verifyState(State.OPEN, State.CLOSED);
      }
      if (t == null) {
        Callable<Boolean> callable = () -> {
          final Timer.Context timerContext = archiveLogTimer.time();
          try {
            startArchival(recordId, logName, location);
            //Init ArchiveLogWriter for writing in export/archival location
            ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
            writer.init(location, logName);

            LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
            reader.seek(recordId);
            long records = 0;
            boolean isInterrupted = false;
            while (reader.hasNext()) {
              writer.write(ByteBuffer.wrap(reader.next()));
              isInterrupted = Thread.currentThread().isInterrupted();
              if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE || isInterrupted) {
                //roll writer when interuppted or no. of records threshold per file is met
                commit(writer, logName, location);
                if (isInterrupted) {
                  break;
                }
                records = 0;
              }
              records++;
            }
            writer.close();
            if (!isInterrupted) {
              //It means archival is successfully completed on this leader
              completeArchival(writer.getLastWrittenRecordId(), logName, location);
            } else {
              //Thread is interuppted either leader is going down or it become follower
              try {
                //Sleeping here before sending archival request to new leader to
                // avoid causing problem during leader election storm
                Thread.sleep(10000);
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
              }
              sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName, location);
            }
            return true;
          } catch (Exception e) {
            LOG.error("Archival failed for the log:" + logName, e);
            failArchival(recordId, logName, location);
          } finally {
            timerContext.stop();
          }
          return false;
        };

        archiveExportFutures.put(location, executorService.submit(callable));
      }
    }catch (Exception e){
      LOG.warn("Exception while processing archival request for " + logName, e);
      t = e;
    }
    return CompletableFuture.completedFuture(
        Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
  }