in hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java [280:433]
SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
Path wal = walStatus.getPath();
Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
boolean corrupt = false;
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
boolean outputSinkStarted = false;
boolean cancelled = false;
int editsCount = 0;
int editsSkipped = 0;
MonitoredTask status = TaskMonitor.get()
.createStatus("Splitting " + wal + " to temporary staging area.", false, true);
WALStreamReader walReader = null;
this.fileBeingSplit = walStatus;
long startTS = EnvironmentEdgeManager.currentTime();
long length = walStatus.getLen();
String lengthStr = StringUtils.humanSize(length);
createOutputSinkAndEntryBuffers();
try {
String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
LOG.info(logStr);
status.setStatus(logStr);
if (cancel != null && !cancel.progress()) {
cancelled = true;
return new SplitWALResult(false, corrupt);
}
walReader = getReader(walStatus, this.skipErrors, cancel);
if (walReader == null) {
LOG.warn("Nothing in {}; empty?", wal);
return new SplitWALResult(true, corrupt);
}
LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.setReporter(cancel);
outputSink.setStatus(status);
outputSink.startWriterThreads();
outputSinkStarted = true;
Entry entry;
startTS = EnvironmentEdgeManager.currentTime();
while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
// Skip processing the replication marker edits.
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring Replication marker edits.");
}
continue;
}
byte[] region = entry.getKey().getEncodedRegionName();
String encodedRegionNameAsStr = Bytes.toString(region);
Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) {
if (
!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))
) {
// The region directory itself is not present in the FS. This indicates that
// the region/table is already removed. We can just skip all the edits for this
// region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
// will get skipped by the seqId check below.
// See more details at https://issues.apache.org/jira/browse/HBASE-24189
LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
lastFlushedSequenceId = Long.MAX_VALUE;
} else {
if (sequenceIdChecker != null) {
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
storeSeqId.getSequenceId());
}
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
+ TextFormat.shortDebugString(ids));
}
}
if (lastFlushedSequenceId == null) {
lastFlushedSequenceId = -1L;
}
}
lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
}
editsCount++;
if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
editsSkipped++;
continue;
}
// Don't send Compaction/Close/Open region events to recovered edit type sinks.
if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
editsSkipped++;
continue;
}
entryBuffers.appendEntry(entry);
int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
// If sufficient edits have passed, check if we should report progress.
if (
editsCount % interval == 0 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting
) {
numOpenedFilesLastCheck = this.getNumOpenWriters();
String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
+ " edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr);
if (cancel != null && !cancel.progress()) {
cancelled = true;
return new SplitWALResult(false, corrupt);
}
}
}
} catch (InterruptedException ie) {
IOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupt WAL={}", wal, e);
// If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
// zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
if (this.splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null.
splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
}
corrupt = true;
} catch (IOException e) {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
throw e;
} finally {
final String log = "Finishing writing output for " + wal + " so closing down";
LOG.debug(log);
status.setStatus(log);
if (null != walReader) {
walReader.close();
}
try {
if (outputSinkStarted) {
// Set cancelled to true as the immediate following statement will reset its value.
// If close() throws an exception, cancelled will have the right value
cancelled = true;
cancelled = outputSink.close() == null;
}
} finally {
long processCost = EnvironmentEdgeManager.currentTime() - startTS;
// See if length got updated post lease recovery
String msg = "Processed " + editsCount + " edits across "
+ outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost
+ " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + ", length="
+ length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
LOG.info(msg);
status.markComplete(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
}
}
}
return new SplitWALResult(!cancelled, corrupt);
}