protected void chore()

in hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java [83:261]


  protected void chore() {

    long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
      MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
    // We check only those MOB files, which creation time is less
    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
    // gives us full confidence that all corresponding store files will
    // exist at the time cleaning procedure begins and will be examined.
    // So, if MOB file creation time is greater than this maxTimeToArchive,
    // this will be skipped and won't be archived.
    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;

    TableDescriptors htds = rs.getTableDescriptors();
    try {
      FileSystem fs = FileSystem.get(rs.getConfiguration());

      Map<String, TableDescriptor> map = null;
      try {
        map = htds.getAll();
      } catch (IOException e) {
        LOG.error("MobFileCleanerChore failed", e);
        return;
      }
      Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
      for (TableDescriptor htd : map.values()) {
        // Now clean obsolete files for a table
        LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
        List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
        if (list.isEmpty()) {
          // The table is not MOB table, just skip it
          continue;
        }
        List<HRegion> regions = rs.getRegions(htd.getTableName());
        for (HRegion region : regions) {
          for (ColumnFamilyDescriptor hcd : list) {
            HStore store = region.getStore(hcd.getName());
            Collection<HStoreFile> sfs = store.getStorefiles();
            Set<String> regionMobs = new HashSet<String>();
            Path currentPath = null;
            try {
              // collecting referenced MOBs
              for (HStoreFile sf : sfs) {
                currentPath = sf.getPath();
                byte[] mobRefData = null;
                byte[] bulkloadMarkerData = null;
                if (sf.getReader() == null) {
                  synchronized (sf) {
                    boolean needCreateReader = sf.getReader() == null;
                    sf.initReader();
                    mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
                    bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
                    if (needCreateReader) {
                      // close store file to avoid memory leaks
                      sf.closeStoreFile(true);
                    }
                  }
                } else {
                  mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
                  bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
                }

                if (mobRefData == null) {
                  if (bulkloadMarkerData == null) {
                    LOG.warn(
                      "Found old store file with no MOB_FILE_REFS: {} - "
                        + "can not proceed until all old files will be MOB-compacted.",
                      currentPath);
                    return;
                  } else {
                    LOG.debug("Skipping file without MOB references (bulkloaded file):{}",
                      currentPath);
                    continue;
                  }
                }
                // file may or may not have MOB references, but was created by the distributed
                // mob compaction code.
                try {
                  SetMultimap<TableName, String> mobs =
                    MobUtils.deserializeMobFileRefs(mobRefData).build();
                  LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
                  LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
                  regionMobs.addAll(mobs.values());
                } catch (RuntimeException exception) {
                  throw new IOException("failure getting mob references for hfile " + sf,
                    exception);
                }
              }
              // collecting files, MOB included currently being written
              regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
                .map(path -> path.getName()).collect(Collectors.toList()));

              referencedMOBs
                .computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>())
                .computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>())
                .addAll(regionMobs);

            } catch (FileNotFoundException e) {
              LOG.warn(
                "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
                currentPath, e);
              regionMobs.clear();
              continue;
            } catch (IOException e) {
              LOG.error("Failed to clean the obsolete mob files for table={}",
                htd.getTableName().getNameAsString(), e);
            }
          }
        }

        if (LOG.isDebugEnabled()) {
          LOG.debug("Found: {} active mob refs for table={}",
            referencedMOBs.values().stream().map(inner -> inner.values())
              .flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(),
            htd.getTableName().getNameAsString());
        }
        if (LOG.isTraceEnabled()) {
          referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream()
            .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
        }

        // collect regions referencing MOB files belonging to the current rs
        Set<String> regionsCovered = new HashSet<>();
        referencedMOBs.values().stream()
          .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));

        for (ColumnFamilyDescriptor hcd : list) {
          List<Path> toArchive = new ArrayList<Path>();
          String family = hcd.getNameAsString();
          Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family);
          RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
          while (rit.hasNext()) {
            LocatedFileStatus lfs = rit.next();
            Path p = lfs.getPath();
            String[] mobParts = p.getName().split("_");
            String regionName = mobParts[mobParts.length - 1];

            // skip MOB files not belonging to a region assigned to the current rs
            if (!regionsCovered.contains(regionName)) {
              LOG.trace("MOB file does not belong to current rs: {}", p);
              continue;
            }

            // check active or actively written mob files
            Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString());
            if (
              cfMobs != null && cfMobs.get(regionName) != null
                && cfMobs.get(regionName).contains(p.getName())
            ) {
              LOG.trace("Keeping active MOB file: {}", p);
              continue;
            }

            // MOB is not in a list of active references, but it can be too
            // fresh, skip it in this case
            long creationTime = fs.getFileStatus(p).getModificationTime();
            if (creationTime < maxCreationTimeToArchive) {
              LOG.trace("Archiving MOB file {} creation time={}", p,
                (fs.getFileStatus(p).getModificationTime()));
              toArchive.add(p);
            } else {
              LOG.trace("Skipping fresh file: {}. Creation time={}", p,
                fs.getFileStatus(p).getModificationTime());
            }

          }
          LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
            toArchive.size(), htd.getTableName().getNameAsString(), family);
          archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive);
          LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(),
            htd.getTableName().getNameAsString(), family);
        }

        LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());

      }
    } catch (IOException e) {
      LOG.error("MOB Cleaner failed when trying to access the file system", e);
    }
  }