in hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java [83:261]
protected void chore() {
long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
// We check only those MOB files, which creation time is less
// than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
// gives us full confidence that all corresponding store files will
// exist at the time cleaning procedure begins and will be examined.
// So, if MOB file creation time is greater than this maxTimeToArchive,
// this will be skipped and won't be archived.
long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
TableDescriptors htds = rs.getTableDescriptors();
try {
FileSystem fs = FileSystem.get(rs.getConfiguration());
Map<String, TableDescriptor> map = null;
try {
map = htds.getAll();
} catch (IOException e) {
LOG.error("MobFileCleanerChore failed", e);
return;
}
Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
for (TableDescriptor htd : map.values()) {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
if (list.isEmpty()) {
// The table is not MOB table, just skip it
continue;
}
List<HRegion> regions = rs.getRegions(htd.getTableName());
for (HRegion region : regions) {
for (ColumnFamilyDescriptor hcd : list) {
HStore store = region.getStore(hcd.getName());
Collection<HStoreFile> sfs = store.getStorefiles();
Set<String> regionMobs = new HashSet<String>();
Path currentPath = null;
try {
// collecting referenced MOBs
for (HStoreFile sf : sfs) {
currentPath = sf.getPath();
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
if (sf.getReader() == null) {
synchronized (sf) {
boolean needCreateReader = sf.getReader() == null;
sf.initReader();
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
if (needCreateReader) {
// close store file to avoid memory leaks
sf.closeStoreFile(true);
}
}
} else {
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
}
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn(
"Found old store file with no MOB_FILE_REFS: {} - "
+ "can not proceed until all old files will be MOB-compacted.",
currentPath);
return;
} else {
LOG.debug("Skipping file without MOB references (bulkloaded file):{}",
currentPath);
continue;
}
}
// file may or may not have MOB references, but was created by the distributed
// mob compaction code.
try {
SetMultimap<TableName, String> mobs =
MobUtils.deserializeMobFileRefs(mobRefData).build();
LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
regionMobs.addAll(mobs.values());
} catch (RuntimeException exception) {
throw new IOException("failure getting mob references for hfile " + sf,
exception);
}
}
// collecting files, MOB included currently being written
regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
.map(path -> path.getName()).collect(Collectors.toList()));
referencedMOBs
.computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>())
.computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>())
.addAll(regionMobs);
} catch (FileNotFoundException e) {
LOG.warn(
"Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
currentPath, e);
regionMobs.clear();
continue;
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}",
htd.getTableName().getNameAsString(), e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found: {} active mob refs for table={}",
referencedMOBs.values().stream().map(inner -> inner.values())
.flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(),
htd.getTableName().getNameAsString());
}
if (LOG.isTraceEnabled()) {
referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream()
.forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
}
// collect regions referencing MOB files belonging to the current rs
Set<String> regionsCovered = new HashSet<>();
referencedMOBs.values().stream()
.forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
for (ColumnFamilyDescriptor hcd : list) {
List<Path> toArchive = new ArrayList<Path>();
String family = hcd.getNameAsString();
Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family);
RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
String[] mobParts = p.getName().split("_");
String regionName = mobParts[mobParts.length - 1];
// skip MOB files not belonging to a region assigned to the current rs
if (!regionsCovered.contains(regionName)) {
LOG.trace("MOB file does not belong to current rs: {}", p);
continue;
}
// check active or actively written mob files
Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString());
if (
cfMobs != null && cfMobs.get(regionName) != null
&& cfMobs.get(regionName).contains(p.getName())
) {
LOG.trace("Keeping active MOB file: {}", p);
continue;
}
// MOB is not in a list of active references, but it can be too
// fresh, skip it in this case
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
(fs.getFileStatus(p).getModificationTime()));
toArchive.add(p);
} else {
LOG.trace("Skipping fresh file: {}. Creation time={}", p,
fs.getFileStatus(p).getModificationTime());
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
toArchive.size(), htd.getTableName().getNameAsString(), family);
archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive);
LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(),
htd.getTableName().getNameAsString(), family);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
}
} catch (IOException e) {
LOG.error("MOB Cleaner failed when trying to access the file system", e);
}
}