in hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java [1307:1413]
private static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
final FileSystem fs, final Path hbaseRootDir, TableName tableName,
final PathFilter sfFilter, ExecutorService executor)
throws IOException, InterruptedException {
final Map<String, Path> finalResultMap =
resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
// only include the directory paths to tables
Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
// Inside a table, there are compaction.dir directories to skip. Otherwise, all else
// should be regions.
final FSUtils.FamilyDirFilter familyFilter = new FSUtils.FamilyDirFilter(fs);
final Vector<Exception> exceptions = new Vector<>();
try {
List<FileStatus> regionDirs =
FSUtils.listStatusWithStatusFilter(fs, tableDir, new FSUtils.RegionDirFilter(fs));
if (regionDirs == null) {
return finalResultMap;
}
final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
for (FileStatus regionDir : regionDirs) {
final Path dd = regionDir.getPath();
if (!exceptions.isEmpty()) {
break;
}
Runnable getRegionStoreFileMapCall = new Runnable() {
@Override
public void run() {
try {
HashMap<String,Path> regionStoreFileMap = new HashMap<>();
List<FileStatus> familyDirs =
FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
if (familyDirs == null) {
if (!fs.exists(dd)) {
LOG.warn("Skipping region because it no longer exists: " + dd);
} else {
LOG.warn("Skipping region because it has no family dirs: " + dd);
}
return;
}
for (FileStatus familyDir : familyDirs) {
Path family = familyDir.getPath();
if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
continue;
}
// now in family, iterate over the StoreFiles and
// put in map
FileStatus[] familyStatus = fs.listStatus(family);
for (FileStatus sfStatus : familyStatus) {
Path sf = sfStatus.getPath();
if (sfFilter == null || sfFilter.accept(sf)) {
regionStoreFileMap.put(sf.getName(), sf);
}
}
}
finalResultMap.putAll(regionStoreFileMap);
} catch (Exception e) {
LOG.error("Could not get region store file map for region: " + dd, e);
exceptions.add(e);
}
}
};
// If executor is available, submit async tasks to exec concurrently, otherwise
// just do serial sync execution
if (executor != null) {
Future<?> future = executor.submit(getRegionStoreFileMapCall);
futures.add(future);
} else {
FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
future.run();
futures.add(future);
}
}
// Ensure all pending tasks are complete (or that we run into an exception)
for (Future<?> f : futures) {
if (!exceptions.isEmpty()) {
break;
}
try {
f.get();
} catch (ExecutionException e) {
LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e);
// Shouldn't happen, we already logged/caught any exceptions in the Runnable
}
}
} catch (IOException e) {
LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
exceptions.add(e);
} finally {
if (!exceptions.isEmpty()) {
// Just throw the first exception as an indication something bad happened
// Don't need to propagate all the exceptions, we already logged them all anyway
Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
throw Throwables.propagate(exceptions.firstElement());
}
}
return finalResultMap;
}