public CompactionStats call()

in server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java [316:471]


  public CompactionStats call()
      throws IOException, CompactionCanceledException, InterruptedException {

    FileSKVWriter mfw = null;

    CompactionStats majCStats = new CompactionStats();

    startTime = Timer.startNew();

    boolean remove = runningCompactions.add(this);

    String threadStartDate = dateFormatter.format(new Date());

    clearCurrentEntryCounts();

    String oldThreadName = Thread.currentThread().getName();
    String newThreadName =
        "MajC compacting " + extent + " started " + threadStartDate + " file: " + outputFile;
    Thread.currentThread().setName(newThreadName);
    // Use try w/ resources for clearing the thread instead of finally because clearing may throw an
    // exception. Java's handling of exceptions thrown in finally blocks is not good.
    try (var ignored = setThread()) {
      FileOperations fileFactory = FileOperations.getInstance();
      FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());

      // Normally you would not want the DataNode to continue to
      // cache blocks in the page cache for compaction input files
      // as these files are normally marked for deletion after a
      // compaction occurs. However there can be cases where the
      // compaction input files will continue to be used, like in
      // the case of bulk import files which may be assigned to many
      // tablets and will still be needed until all of the tablets
      // have compacted, or in the case of cloned tables where one
      // of the tables has compacted the input file but the other
      // has not.
      final String dropCachePrefixProperty =
          acuTableConf.get(Property.TABLE_COMPACTION_INPUT_DROP_CACHE_BEHIND);
      final EnumSet<FilePrefix> dropCacheFileTypes =
          ConfigurationTypeHelper.getDropCacheBehindFilePrefixes(dropCachePrefixProperty);

      final boolean isMinC = env.getIteratorScope() == IteratorUtil.IteratorScope.minc;

      final boolean dropCacheBehindOutput =
          !SystemTables.ROOT.tableId().equals(this.extent.tableId())
              && !SystemTables.METADATA.tableId().equals(this.extent.tableId())
              && ((isMinC && acuTableConf.getBoolean(Property.TABLE_MINC_OUTPUT_DROP_CACHE))
                  || (!isMinC && acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE)));

      WriterBuilder outBuilder =
          fileFactory.newWriterBuilder().forFile(outputFile, ns, ns.getConf(), cryptoService)
              .withTableConfiguration(acuTableConf);
      if (dropCacheBehindOutput) {
        outBuilder.dropCachesBehind();
      }
      mfw = outBuilder.build();

      Map<String,Set<ByteSequence>> lGroups = getLocalityGroups(acuTableConf);

      long t1 = System.currentTimeMillis();

      HashSet<ByteSequence> allColumnFamilies = new HashSet<>();

      if (mfw.supportsLocalityGroups()) {
        for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
          setLocalityGroup(entry.getKey());
          compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats,
              dropCacheFileTypes);
          allColumnFamilies.addAll(entry.getValue());
        }
      }

      setLocalityGroup("");
      compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats, dropCacheFileTypes);

      long t2 = System.currentTimeMillis();

      FileSKVWriter mfwTmp = mfw;
      mfw = null; // set this to null so we do not try to close it again in finally if the close
                  // fails
      try {
        mfwTmp.close(); // if the close fails it will cause the compaction to fail
      } catch (IOException ex) {
        if (!fs.deleteRecursively(outputFile.getPath())) {
          if (fs.exists(outputFile.getPath())) {
            log.error("Unable to delete {}", outputFile);
          }
        }
        throw ex;
      }

      log.trace(String.format(
          "Compaction %s %,d read | %,d written | %,6d entries/sec"
              + " | %,6.3f secs | %,12d bytes | %9.3f byte/sec | %,d paused",
          extent, majCStats.getEntriesRead(), majCStats.getEntriesWritten(),
          (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0,
          mfwTmp.getLength(), mfwTmp.getLength() / ((t2 - t1) / 1000.0),
          majCStats.getTimesPaused()));

      majCStats.setFileSize(mfwTmp.getLength());
      return majCStats;
    } catch (CompactionCanceledException e) {
      log.debug("Compaction canceled {}", extent);
      throw e;
    } catch (IterationInterruptedException iie) {
      if (!env.isCompactionEnabled()) {
        log.debug("Compaction canceled {}", extent);
        throw new CompactionCanceledException();
      }
      log.debug("RFile interrupted {}", extent);
      throw iie;
    } catch (IOException | RuntimeException e) {
      Collection<String> inputFileNames =
          Collections2.transform(getFilesToCompact(), StoredTabletFile::getFileName);
      String outputFileName = outputFile.getFileName();
      log.error(
          "Compaction error. Compaction info: "
              + "extent: {}, input files: {}, output file: {}, iterators: {}, start date: {}",
          getExtent(), inputFileNames, outputFileName, getIterators(), threadStartDate, e);
      throw e;
    } finally {
      Thread.currentThread().setName(oldThreadName);
      if (remove) {
        runningCompactions.remove(this);
      }

      updateGlobalEntryCounts();

      try {
        if (mfw != null) {
          // compaction must not have finished successfully, so close its output file
          try {
            mfw.close();
          } finally {
            if (!fs.deleteRecursively(outputFile.getPath())) {
              if (fs.exists(outputFile.getPath())) {
                log.error("Unable to delete {}", outputFile);
              }
            }
          }
        }
      } catch (IOException | RuntimeException e) {
        /*
         * If compaction is enabled then the compaction didn't finish due to a real error condition
         * so log any errors on the output file close as a warning. However, if not enabled, then
         * the compaction was canceled due to something like tablet split, user cancellation, or
         * table deletion which is not an error so log any errors on output file close as a debug as
         * this may happen due to an InterruptedException thrown due to the cancellation.
         */
        if (env.isCompactionEnabled()) {
          log.warn("{}", e.getMessage(), e);
        } else {
          log.debug("{}", e.getMessage(), e);
        }
      }
    }
  }