SplitWALResult splitWAL()

in hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java [280:433]


  SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
    Path wal = walStatus.getPath();
    Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
    boolean corrupt = false;
    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
    boolean outputSinkStarted = false;
    boolean cancelled = false;
    int editsCount = 0;
    int editsSkipped = 0;
    MonitoredTask status = TaskMonitor.get()
      .createStatus("Splitting " + wal + " to temporary staging area.", false, true);
    WALStreamReader walReader = null;
    this.fileBeingSplit = walStatus;
    long startTS = EnvironmentEdgeManager.currentTime();
    long length = walStatus.getLen();
    String lengthStr = StringUtils.humanSize(length);
    createOutputSinkAndEntryBuffers();
    try {
      String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
      LOG.info(logStr);
      status.setStatus(logStr);
      if (cancel != null && !cancel.progress()) {
        cancelled = true;
        return new SplitWALResult(false, corrupt);
      }
      walReader = getReader(walStatus, this.skipErrors, cancel);
      if (walReader == null) {
        LOG.warn("Nothing in {}; empty?", wal);
        return new SplitWALResult(true, corrupt);
      }
      LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
      int numOpenedFilesLastCheck = 0;
      outputSink.setReporter(cancel);
      outputSink.setStatus(status);
      outputSink.startWriterThreads();
      outputSinkStarted = true;
      Entry entry;
      startTS = EnvironmentEdgeManager.currentTime();
      while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
        if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
          // Skip processing the replication marker edits.
          if (LOG.isDebugEnabled()) {
            LOG.debug("Ignoring Replication marker edits.");
          }
          continue;
        }
        byte[] region = entry.getKey().getEncodedRegionName();
        String encodedRegionNameAsStr = Bytes.toString(region);
        Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
        if (lastFlushedSequenceId == null) {
          if (
            !(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))
          ) {
            // The region directory itself is not present in the FS. This indicates that
            // the region/table is already removed. We can just skip all the edits for this
            // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
            // will get skipped by the seqId check below.
            // See more details at https://issues.apache.org/jira/browse/HBASE-24189
            LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
            lastFlushedSequenceId = Long.MAX_VALUE;
          } else {
            if (sequenceIdChecker != null) {
              RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
              Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
              for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
                maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
                  storeSeqId.getSequenceId());
              }
              regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
              lastFlushedSequenceId = ids.getLastFlushedSequenceId();
              if (LOG.isDebugEnabled()) {
                LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
                  + TextFormat.shortDebugString(ids));
              }
            }
            if (lastFlushedSequenceId == null) {
              lastFlushedSequenceId = -1L;
            }
          }
          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
        }
        editsCount++;
        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
          editsSkipped++;
          continue;
        }
        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
          editsSkipped++;
          continue;
        }
        entryBuffers.appendEntry(entry);
        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
        // If sufficient edits have passed, check if we should report progress.
        if (
          editsCount % interval == 0 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting
        ) {
          numOpenedFilesLastCheck = this.getNumOpenWriters();
          String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
            + " edits, skipped " + editsSkipped + " edits.";
          status.setStatus("Split " + countsStr);
          if (cancel != null && !cancel.progress()) {
            cancelled = true;
            return new SplitWALResult(false, corrupt);
          }
        }
      }
    } catch (InterruptedException ie) {
      IOException iie = new InterruptedIOException();
      iie.initCause(ie);
      throw iie;
    } catch (CorruptedLogFileException e) {
      LOG.warn("Could not parse, corrupt WAL={}", wal, e);
      // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
      // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
      if (this.splitLogWorkerCoordination != null) {
        // Some tests pass in a csm of null.
        splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
      }
      corrupt = true;
    } catch (IOException e) {
      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
      throw e;
    } finally {
      final String log = "Finishing writing output for " + wal + " so closing down";
      LOG.debug(log);
      status.setStatus(log);
      if (null != walReader) {
        walReader.close();
      }
      try {
        if (outputSinkStarted) {
          // Set cancelled to true as the immediate following statement will reset its value.
          // If close() throws an exception, cancelled will have the right value
          cancelled = true;
          cancelled = outputSink.close() == null;
        }
      } finally {
        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
        // See if length got updated post lease recovery
        String msg = "Processed " + editsCount + " edits across "
          + outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost
          + " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + ", length="
          + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
        LOG.info(msg);
        status.markComplete(msg);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
        }
      }
    }
    return new SplitWALResult(!cancelled, corrupt);
  }