private long replayRecoveredEdits()

in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java [5680:5868]


  private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdInStores,
    final CancelableProgressable reporter, FileSystem fs) throws IOException {
    String msg = "Replaying edits from " + edits;
    LOG.info(msg);
    MonitoredTask status = TaskMonitor.get().createStatus(msg);

    status.setStatus("Opening recovered edits");
    try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
      long currentEditSeqId = -1;
      long currentReplaySeqId = -1;
      long firstSeqIdInLog = -1;
      long skippedEdits = 0;
      long editsCount = 0;
      long intervalEdits = 0;
      WAL.Entry entry;
      HStore store = null;
      boolean reported_once = false;
      ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();

      try {
        // How many edits seen before we check elapsed time
        int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
        // How often to send a progress report (default 1/2 master timeout)
        int period = this.conf.getInt("hbase.hstore.report.period", 300000);
        long lastReport = EnvironmentEdgeManager.currentTime();

        if (coprocessorHost != null) {
          coprocessorHost.preReplayWALs(this.getRegionInfo(), edits);
        }

        while ((entry = reader.next()) != null) {
          WALKey key = entry.getKey();
          WALEdit val = entry.getEdit();

          if (ng != null) { // some test, or nonces disabled
            ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
          }

          if (reporter != null) {
            intervalEdits += val.size();
            if (intervalEdits >= interval) {
              // Number of edits interval reached
              intervalEdits = 0;
              long cur = EnvironmentEdgeManager.currentTime();
              if (lastReport + period <= cur) {
                status.setStatus(
                  "Replaying edits..." + " skipped=" + skippedEdits + " edits=" + editsCount);
                // Timeout reached
                if (!reporter.progress()) {
                  msg = "Progressable reporter failed, stopping replay for region " + this;
                  LOG.warn(msg);
                  status.abort(msg);
                  throw new IOException(msg);
                }
                reported_once = true;
                lastReport = cur;
              }
            }
          }

          if (firstSeqIdInLog == -1) {
            firstSeqIdInLog = key.getSequenceId();
          }
          if (currentEditSeqId > key.getSequenceId()) {
            // when this condition is true, it means we have a serious defect because we need to
            // maintain increasing SeqId for WAL edits per region
            LOG.error(getRegionInfo().getEncodedName() + " : " + "Found decreasing SeqId. PreId="
              + currentEditSeqId + " key=" + key + "; edit=" + val);
          } else {
            currentEditSeqId = key.getSequenceId();
          }
          currentReplaySeqId =
            (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId;

          boolean checkRowWithinBoundary = false;
          // Check this edit is for this region.
          if (
            !Bytes.equals(key.getEncodedRegionName(), this.getRegionInfo().getEncodedNameAsBytes())
          ) {
            checkRowWithinBoundary = true;
          }

          boolean flush = false;
          MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
          for (Cell c : val.getCells()) {
            assert c instanceof ExtendedCell;
            ExtendedCell cell = (ExtendedCell) c;
            // Check this edit is for me. Also, guard against writing the special
            // METACOLUMN info such as HBASE::CACHEFLUSH entries
            if (WALEdit.isMetaEditFamily(cell)) {
              // if region names don't match, skipp replaying compaction marker
              if (!checkRowWithinBoundary) {
                // this is a special edit, we should handle it
                CompactionDescriptor compaction = WALEdit.getCompaction(cell);
                if (compaction != null) {
                  // replay the compaction
                  replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
                }
              }
              skippedEdits++;
              continue;
            }
            // Figure which store the edit is meant for.
            if (
              store == null
                || !CellUtil.matchingFamily(cell, store.getColumnFamilyDescriptor().getName())
            ) {
              store = getStore(cell);
            }
            if (store == null) {
              // This should never happen. Perhaps schema was changed between
              // crash and redeploy?
              LOG.warn("No family for cell {} in region {}", cell, this);
              skippedEdits++;
              continue;
            }
            if (
              checkRowWithinBoundary && !rowIsInRange(this.getRegionInfo(), cell.getRowArray(),
                cell.getRowOffset(), cell.getRowLength())
            ) {
              LOG.warn("Row of {} is not within region boundary for region {}", cell, this);
              skippedEdits++;
              continue;
            }
            // Now, figure if we should skip this edit.
            if (
              key.getSequenceId()
                  <= maxSeqIdInStores.get(store.getColumnFamilyDescriptor().getName())
            ) {
              skippedEdits++;
              continue;
            }
            PrivateCellUtil.setSequenceId(cell, currentReplaySeqId);

            restoreEdit(store, cell, memStoreSizing);
            editsCount++;
          }
          MemStoreSize mss = memStoreSizing.getMemStoreSize();
          incMemStoreSize(mss);
          flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
          if (flush) {
            internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
              FlushLifeCycleTracker.DUMMY);
          }
        }

        if (coprocessorHost != null) {
          coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
        }
      } catch (EOFException eof) {
        if (!conf.getBoolean(RECOVERED_EDITS_IGNORE_EOF, false)) {
          Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
          msg = "EnLongAddered EOF. Most likely due to Master failure during "
            + "wal splitting, so we have this data in another edit. Continuing, but renaming "
            + edits + " as " + p + " for region " + this;
          LOG.warn(msg, eof);
          status.abort(msg);
        } else {
          LOG.warn("EOF while replaying recover edits and config '{}' is true so "
            + "we will ignore it and continue", RECOVERED_EDITS_IGNORE_EOF, eof);
        }
      } catch (IOException ioe) {
        // If the IOE resulted from bad file format,
        // then this problem is idempotent and retrying won't help
        if (ioe.getCause() instanceof ParseException) {
          Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
          msg =
            "File corruption enLongAddered!  " + "Continuing, but renaming " + edits + " as " + p;
          LOG.warn(msg, ioe);
          status.setStatus(msg);
        } else {
          status.abort(StringUtils.stringifyException(ioe));
          // other IO errors may be transient (bad network connection,
          // checksum exception on one datanode, etc). throw & retry
          throw ioe;
        }
      }
      if (reporter != null && !reported_once) {
        reporter.progress();
      }
      msg = "Applied " + editsCount + ", skipped " + skippedEdits + ", firstSequenceIdInLog="
        + firstSeqIdInLog + ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
      status.markComplete(msg);
      LOG.debug(msg);
      return currentEditSeqId;
    } finally {
      status.cleanup();
    }
  }