public void onCompactionJobComplete()

in gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java [87:207]


  public void onCompactionJobComplete(FileSystemDataset dataset) throws IOException {
    if (dataset.isVirtual()) {
      return;
    }

    if (configurator != null && configurator.isJobCreated()) {
      CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset);
      Path tmpPath = configurator.getMrOutputPath();
      Path dstPath = new Path(result.getDstAbsoluteDir());

      // this is append delta mode due to the compaction rename source dir mode being enabled
      boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
          MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);

      Job job = this.configurator.getConfiguredJob();

      long newTotalRecords = 0;
      long oldTotalRecords = helper.readRecordCount(new Path(result.getDstAbsoluteDir()));
      long executionCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));

      List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, tmpPath, this.fs,
          ImmutableList.of(configurator.getFileExtension()));
      HashSet<Path> outputFiles = new HashSet<>();
      if (appendDeltaOutput) {
        FsPermission permission =
            HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
                FsPermission.getDefault());
        WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission);
        // append files under mr output to destination
        for (Path filePath : goodPaths) {
          String fileName = filePath.getName();
          log.info(String.format("Adding %s to %s", filePath.toString(), dstPath));
          Path outPath = new Path(dstPath, fileName);

          if (!this.fs.rename(filePath, outPath)) {
            throw new IOException(String.format("Unable to move %s to %s", filePath.toString(), outPath.toString()));
          }
        }

        // Obtain record count from input file names.
        // We don't get record count from map-reduce counter because in the next run, the threshold (delta record)
        // calculation is based on the input file names. By pre-defining which input folders are involved in the
        // MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records
        // (all previous run + current run) is possible.
        newTotalRecords = this.configurator.getFileNameRecordCount();
      } else {
        if (state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false)) {
          Path oldFilePath =
              PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
          dstPath =
              PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
          this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
        } else {
          this.configurator.getOldFiles().add(this.fs.makeQualified(dstPath).toString());
        }

        // It is possible that the destination path is a non-empty directory if the previous run failed.
        // Hence, always delete the destination path before moving the tmp path to the destination path.
        this.fs.delete(dstPath, true);
        FsPermission permission =
            HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
                FsPermission.getDefault());

        WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath.getParent(), permission);
        if (!this.fs.rename(tmpPath, dstPath)) {
          throw new IOException(String.format("Unable to move %s to %s", tmpPath, dstPath));
        }

        // Obtain record count from map reduce job counter
        // We don't get record count from file name because tracking which files are actually involved in the MR execution can
        // be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not
        // able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files).
        Counter counter = job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
        newTotalRecords = counter.getValue();
      }
      final Path finalDstPath = dstPath;
      goodPaths.stream().forEach(p -> {
        String fileName = p.getName();
        outputFiles.add(new Path(finalDstPath, fileName));
      });
      this.configurator.setDstNewFiles(outputFiles);

      State compactionState = helper.loadState(new Path(result.getDstAbsoluteDir()));
      if (executionCount != 0) {
        compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executionCount),
            Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
        compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executionCount),
            Long.toString(executionCount));
        compactionState.setProp(DUPLICATE_COUNT_TOTAL + Long.toString(executionCount),
            compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
      }
      if (state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED, false)) {
        //GMCE enabled, set the key to be false to indicate that GMCE has not been sent yet
        compactionState.setProp(CompactionGMCEPublishingAction.GMCE_EMITTED_KEY, false);
      }
      compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
      compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1));
      compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
          this.configurator.getConfiguredJob().getJobID().toString());
      compactionState.setProp(DUPLICATE_COUNT_TOTAL,
          job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
      compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
          this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
      helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
      log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp(DUPLICATE_COUNT_TOTAL));

      log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath,
          executionCount + 1);

      // submit events for record count
      if (eventSubmitter != null) {
        Map<String, String> eventMetadataMap =
            ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
                CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
                CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
                CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1),
                CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
        this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
      }
    }
  }