in server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java [316:471]
public CompactionStats call()
throws IOException, CompactionCanceledException, InterruptedException {
FileSKVWriter mfw = null;
CompactionStats majCStats = new CompactionStats();
startTime = Timer.startNew();
boolean remove = runningCompactions.add(this);
String threadStartDate = dateFormatter.format(new Date());
clearCurrentEntryCounts();
String oldThreadName = Thread.currentThread().getName();
String newThreadName =
"MajC compacting " + extent + " started " + threadStartDate + " file: " + outputFile;
Thread.currentThread().setName(newThreadName);
// Use try w/ resources for clearing the thread instead of finally because clearing may throw an
// exception. Java's handling of exceptions thrown in finally blocks is not good.
try (var ignored = setThread()) {
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());
// Normally you would not want the DataNode to continue to
// cache blocks in the page cache for compaction input files
// as these files are normally marked for deletion after a
// compaction occurs. However there can be cases where the
// compaction input files will continue to be used, like in
// the case of bulk import files which may be assigned to many
// tablets and will still be needed until all of the tablets
// have compacted, or in the case of cloned tables where one
// of the tables has compacted the input file but the other
// has not.
final String dropCachePrefixProperty =
acuTableConf.get(Property.TABLE_COMPACTION_INPUT_DROP_CACHE_BEHIND);
final EnumSet<FilePrefix> dropCacheFileTypes =
ConfigurationTypeHelper.getDropCacheBehindFilePrefixes(dropCachePrefixProperty);
final boolean isMinC = env.getIteratorScope() == IteratorUtil.IteratorScope.minc;
final boolean dropCacheBehindOutput =
!SystemTables.ROOT.tableId().equals(this.extent.tableId())
&& !SystemTables.METADATA.tableId().equals(this.extent.tableId())
&& ((isMinC && acuTableConf.getBoolean(Property.TABLE_MINC_OUTPUT_DROP_CACHE))
|| (!isMinC && acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE)));
WriterBuilder outBuilder =
fileFactory.newWriterBuilder().forFile(outputFile, ns, ns.getConf(), cryptoService)
.withTableConfiguration(acuTableConf);
if (dropCacheBehindOutput) {
outBuilder.dropCachesBehind();
}
mfw = outBuilder.build();
Map<String,Set<ByteSequence>> lGroups = getLocalityGroups(acuTableConf);
long t1 = System.currentTimeMillis();
HashSet<ByteSequence> allColumnFamilies = new HashSet<>();
if (mfw.supportsLocalityGroups()) {
for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
setLocalityGroup(entry.getKey());
compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats,
dropCacheFileTypes);
allColumnFamilies.addAll(entry.getValue());
}
}
setLocalityGroup("");
compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats, dropCacheFileTypes);
long t2 = System.currentTimeMillis();
FileSKVWriter mfwTmp = mfw;
mfw = null; // set this to null so we do not try to close it again in finally if the close
// fails
try {
mfwTmp.close(); // if the close fails it will cause the compaction to fail
} catch (IOException ex) {
if (!fs.deleteRecursively(outputFile.getPath())) {
if (fs.exists(outputFile.getPath())) {
log.error("Unable to delete {}", outputFile);
}
}
throw ex;
}
log.trace(String.format(
"Compaction %s %,d read | %,d written | %,6d entries/sec"
+ " | %,6.3f secs | %,12d bytes | %9.3f byte/sec | %,d paused",
extent, majCStats.getEntriesRead(), majCStats.getEntriesWritten(),
(int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0,
mfwTmp.getLength(), mfwTmp.getLength() / ((t2 - t1) / 1000.0),
majCStats.getTimesPaused()));
majCStats.setFileSize(mfwTmp.getLength());
return majCStats;
} catch (CompactionCanceledException e) {
log.debug("Compaction canceled {}", extent);
throw e;
} catch (IterationInterruptedException iie) {
if (!env.isCompactionEnabled()) {
log.debug("Compaction canceled {}", extent);
throw new CompactionCanceledException();
}
log.debug("RFile interrupted {}", extent);
throw iie;
} catch (IOException | RuntimeException e) {
Collection<String> inputFileNames =
Collections2.transform(getFilesToCompact(), StoredTabletFile::getFileName);
String outputFileName = outputFile.getFileName();
log.error(
"Compaction error. Compaction info: "
+ "extent: {}, input files: {}, output file: {}, iterators: {}, start date: {}",
getExtent(), inputFileNames, outputFileName, getIterators(), threadStartDate, e);
throw e;
} finally {
Thread.currentThread().setName(oldThreadName);
if (remove) {
runningCompactions.remove(this);
}
updateGlobalEntryCounts();
try {
if (mfw != null) {
// compaction must not have finished successfully, so close its output file
try {
mfw.close();
} finally {
if (!fs.deleteRecursively(outputFile.getPath())) {
if (fs.exists(outputFile.getPath())) {
log.error("Unable to delete {}", outputFile);
}
}
}
}
} catch (IOException | RuntimeException e) {
/*
* If compaction is enabled then the compaction didn't finish due to a real error condition
* so log any errors on the output file close as a warning. However, if not enabled, then
* the compaction was canceled due to something like tablet split, user cancellation, or
* table deletion which is not an error so log any errors on output file close as a debug as
* this may happen due to an InterruptedException thrown due to the cancellation.
*/
if (env.isCompactionEnabled()) {
log.warn("{}", e.getMessage(), e);
} else {
log.debug("{}", e.getMessage(), e);
}
}
}
}