in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java [5680:5868]
private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, FileSystem fs) throws IOException {
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
status.setStatus("Opening recovered edits");
try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
long intervalEdits = 0;
WAL.Entry entry;
HStore store = null;
boolean reported_once = false;
ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
try {
// How many edits seen before we check elapsed time
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
// How often to send a progress report (default 1/2 master timeout)
int period = this.conf.getInt("hbase.hstore.report.period", 300000);
long lastReport = EnvironmentEdgeManager.currentTime();
if (coprocessorHost != null) {
coprocessorHost.preReplayWALs(this.getRegionInfo(), edits);
}
while ((entry = reader.next()) != null) {
WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (ng != null) { // some test, or nonces disabled
ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
}
if (reporter != null) {
intervalEdits += val.size();
if (intervalEdits >= interval) {
// Number of edits interval reached
intervalEdits = 0;
long cur = EnvironmentEdgeManager.currentTime();
if (lastReport + period <= cur) {
status.setStatus(
"Replaying edits..." + " skipped=" + skippedEdits + " edits=" + editsCount);
// Timeout reached
if (!reporter.progress()) {
msg = "Progressable reporter failed, stopping replay for region " + this;
LOG.warn(msg);
status.abort(msg);
throw new IOException(msg);
}
reported_once = true;
lastReport = cur;
}
}
}
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getSequenceId();
}
if (currentEditSeqId > key.getSequenceId()) {
// when this condition is true, it means we have a serious defect because we need to
// maintain increasing SeqId for WAL edits per region
LOG.error(getRegionInfo().getEncodedName() + " : " + "Found decreasing SeqId. PreId="
+ currentEditSeqId + " key=" + key + "; edit=" + val);
} else {
currentEditSeqId = key.getSequenceId();
}
currentReplaySeqId =
(key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId;
boolean checkRowWithinBoundary = false;
// Check this edit is for this region.
if (
!Bytes.equals(key.getEncodedRegionName(), this.getRegionInfo().getEncodedNameAsBytes())
) {
checkRowWithinBoundary = true;
}
boolean flush = false;
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
for (Cell c : val.getCells()) {
assert c instanceof ExtendedCell;
ExtendedCell cell = (ExtendedCell) c;
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (WALEdit.isMetaEditFamily(cell)) {
// if region names don't match, skipp replaying compaction marker
if (!checkRowWithinBoundary) {
// this is a special edit, we should handle it
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
// replay the compaction
replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
}
}
skippedEdits++;
continue;
}
// Figure which store the edit is meant for.
if (
store == null
|| !CellUtil.matchingFamily(cell, store.getColumnFamilyDescriptor().getName())
) {
store = getStore(cell);
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
// crash and redeploy?
LOG.warn("No family for cell {} in region {}", cell, this);
skippedEdits++;
continue;
}
if (
checkRowWithinBoundary && !rowIsInRange(this.getRegionInfo(), cell.getRowArray(),
cell.getRowOffset(), cell.getRowLength())
) {
LOG.warn("Row of {} is not within region boundary for region {}", cell, this);
skippedEdits++;
continue;
}
// Now, figure if we should skip this edit.
if (
key.getSequenceId()
<= maxSeqIdInStores.get(store.getColumnFamilyDescriptor().getName())
) {
skippedEdits++;
continue;
}
PrivateCellUtil.setSequenceId(cell, currentReplaySeqId);
restoreEdit(store, cell, memStoreSizing);
editsCount++;
}
MemStoreSize mss = memStoreSizing.getMemStoreSize();
incMemStoreSize(mss);
flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
if (flush) {
internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY);
}
}
if (coprocessorHost != null) {
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
if (!conf.getBoolean(RECOVERED_EDITS_IGNORE_EOF, false)) {
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during "
+ "wal splitting, so we have this data in another edit. Continuing, but renaming "
+ edits + " as " + p + " for region " + this;
LOG.warn(msg, eof);
status.abort(msg);
} else {
LOG.warn("EOF while replaying recover edits and config '{}' is true so "
+ "we will ignore it and continue", RECOVERED_EDITS_IGNORE_EOF, eof);
}
} catch (IOException ioe) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
msg =
"File corruption enLongAddered! " + "Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
status.setStatus(msg);
} else {
status.abort(StringUtils.stringifyException(ioe));
// other IO errors may be transient (bad network connection,
// checksum exception on one datanode, etc). throw & retry
throw ioe;
}
}
if (reporter != null && !reported_once) {
reporter.progress();
}
msg = "Applied " + editsCount + ", skipped " + skippedEdits + ", firstSequenceIdInLog="
+ firstSeqIdInLog + ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
status.markComplete(msg);
LOG.debug(msg);
return currentEditSeqId;
} finally {
status.cleanup();
}
}