in server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java [147:343]
public int updateAndCheckTablets(Manager manager, FateId fateId)
throws AcceptableThriftTableOperationException {
var ample = manager.getContext().getAmple();
// This map tracks tablets that had a conditional mutation submitted to select files. If the
// conditional mutation is successful then want to log a message. Use a concurrent map as the
// result consumer may run in another thread.
ConcurrentHashMap<KeyExtent,Set<StoredTabletFile>> selectionsSubmitted =
new ConcurrentHashMap<>();
Consumer<Ample.ConditionalResult> resultConsumer = result -> {
if (result.getStatus() == Status.REJECTED) {
log.debug("{} update for {} was rejected ", fateId, result.getExtent());
}
// always remove extents from the map even if not successful in order to avoid placing too
// many in memory
var selected = selectionsSubmitted.remove(result.getExtent());
if (selected != null && result.getStatus() == Status.ACCEPTED) {
// successfully selected files so log this
TabletLogger.selected(fateId, result.getExtent(), selected);
}
};
long t1 = System.currentTimeMillis();
int complete = 0;
int total = 0;
int opidsSeen = 0;
int noFiles = 0;
int noneSelected = 0;
int alreadySelected = 0;
int otherSelected = 0;
int userCompactionRequested = 0;
int userCompactionWaiting = 0;
int selected = 0;
KeyExtent minSelected = null;
KeyExtent maxSelected = null;
try (
var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow)
.fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID, USER_COMPACTION_REQUESTED)
.checkConsistency().build();
var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) {
CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), fateId);
for (TabletMetadata tablet : tablets) {
total++;
if (tablet.getCompacted().contains(fateId)) {
// this tablet is already considered done
log.trace("{} compaction for {} is complete", fateId, tablet.getExtent());
complete++;
} else if (tablet.getOperationId() != null) {
log.trace("{} ignoring tablet {} with active operation {} ", fateId, tablet.getExtent(),
tablet.getOperationId());
opidsSeen++;
} else if (tablet.getFiles().isEmpty()) {
log.trace("{} tablet {} has no files, attempting to mark as compacted ", fateId,
tablet.getExtent());
// this tablet has no files try to mark it as done
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, FILES, COMPACTED).putCompacted(fateId)
.submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId),
() -> "no files, attempting to mark as compacted. " + fateId);
noFiles++;
} else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) {
// there are no selected files
log.trace("{} selecting {} files compaction for {}", fateId, tablet.getFiles().size(),
tablet.getExtent());
Set<StoredTabletFile> filesToCompact;
try {
filesToCompact = CompactionPluginUtils.selectFiles(manager.getContext(),
tablet.getExtent(), config, tablet.getFilesMap());
} catch (Exception e) {
log.warn("{} failed to select files for {} using {}", fateId, tablet.getExtent(),
config.getSelector(), e);
throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
TableOperation.COMPACT, TableOperationExceptionType.OTHER,
"Failed to select files");
}
if (log.isTraceEnabled()) {
log.trace("{} selected {} of {} files for {}", fateId,
filesToCompact.stream().map(AbstractTabletFile::getFileName)
.collect(Collectors.toList()),
tablet.getFiles().stream().map(AbstractTabletFile::getFileName)
.collect(Collectors.toList()),
tablet.getExtent());
}
if (filesToCompact.isEmpty()) {
// no files were selected so mark the tablet as compacted
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(fateId)
.submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId),
() -> "no files, attempting to mark as compacted. " + fateId);
noneSelected++;
} else {
var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED, USER_COMPACTION_REQUESTED);
var selectedFiles = new SelectedFiles(filesToCompact,
tablet.getFiles().equals(filesToCompact), fateId, manager.getSteadyTime());
mutator.putSelectedFiles(selectedFiles);
// We no longer need to include this marker if files are selected
if (tablet.getUserCompactionsRequested().contains(fateId)) {
mutator.deleteUserCompactionRequested(fateId);
}
selectionsSubmitted.put(tablet.getExtent(), filesToCompact);
mutator.submit(
tabletMetadata -> tabletMetadata.getSelectedFiles() != null
&& tabletMetadata.getSelectedFiles().getFateId().equals(fateId)
|| tabletMetadata.getCompacted().contains(fateId),
() -> "selecting files for compaction. " + fateId);
if (minSelected == null || tablet.getExtent().compareTo(minSelected) < 0) {
minSelected = tablet.getExtent();
}
if (maxSelected == null || tablet.getExtent().compareTo(maxSelected) > 0) {
maxSelected = tablet.getExtent();
}
selected++;
}
} else if (tablet.getSelectedFiles() != null) {
if (tablet.getSelectedFiles().getFateId().equals(fateId)) {
log.trace(
"{} tablet {} already has {} selected files for this compaction, waiting for them be processed",
fateId, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size());
alreadySelected++;
} else {
log.trace(
"{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed",
fateId, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(),
tablet.getSelectedFiles().getFateId());
otherSelected++;
}
} else if (!tablet.getExternalCompactions().isEmpty()) {
// If there are compactions preventing selection of files, then add
// selecting marker that prevents new compactions from starting
if (!tablet.getUserCompactionsRequested().contains(fateId)) {
log.trace(
"Another compaction exists for {}, Marking {} as needing a user requested compaction",
tablet.getExtent(), fateId);
var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, ECOMP, USER_COMPACTION_REQUESTED)
.putUserCompactionRequested(fateId);
mutator.submit(tm -> tm.getUserCompactionsRequested().contains(fateId),
() -> "marking as needing a user requested compaction. " + fateId);
userCompactionRequested++;
} else {
// Marker was already added and we are waiting
log.trace("Waiting on {} for previously marked user requested compaction {} to run",
tablet.getExtent(), fateId);
userCompactionWaiting++;
}
}
}
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException(e);
}
long t2 = System.currentTimeMillis();
// The Fate operation gets a table lock that prevents the table from being deleted while this is
// running, so seeing zero tablets in the metadata table is unexpected.
Preconditions.checkState(total > 0,
"No tablets were seen for table %s in the compaction range %s %s", tableId,
startRow == null ? new Text() : new Text(startRow),
endRow == null ? new Text() : new Text(endRow));
log.debug(
"{} tablet stats, total:{} complete:{} selected_now:{} selected_prev:{} selected_by_other:{} "
+ "no_files:{} none_selected:{} user_compaction_requested:{} user_compaction_waiting:{} "
+ "opids:{} scan_update_time:{}ms",
fateId, total, complete, selected, alreadySelected, otherSelected, noFiles, noneSelected,
userCompactionRequested, userCompactionWaiting, opidsSeen, t2 - t1);
if (selected > 0) {
manager.getEventCoordinator().event(
new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()),
"%s selected files for compaction for %d tablets", fateId, selected);
}
return total - complete;
}