public void dump()

in src/java/org/apache/nutch/segment/SegmentReader.java [220:319]


  public void dump(Path segment, Path output) throws IOException,
      InterruptedException, ClassNotFoundException {

    LOG.info("SegmentReader: dump segment: {}", segment);

    Job job = Job.getInstance(getConf(), "Nutch SegmentReader: " + segment);
    Configuration conf = job.getConfiguration();

    List<Path> inputDirs = new ArrayList<>();
    if (ge) {
      addSegmSubDirIfExists(inputDirs, conf, segment,
          CrawlDatum.GENERATE_DIR_NAME);
    }
    if (fe) {
      addSegmSubDirIfExists(inputDirs, conf, segment,
          CrawlDatum.FETCH_DIR_NAME);
    }
    if (pa) {
      addSegmSubDirIfExists(inputDirs, conf, segment,
          CrawlDatum.PARSE_DIR_NAME);
    }
    if (co) {
      addSegmSubDirIfExists(inputDirs, conf, segment, Content.DIR_NAME);
    }
    if (pd) {
      addSegmSubDirIfExists(inputDirs, conf, segment, ParseData.DIR_NAME);
    }
    if (pt) {
      addSegmSubDirIfExists(inputDirs, conf, segment, ParseText.DIR_NAME);
    }
    if (inputDirs.isEmpty()) {
      String msg = "No segment subdirectories defined as input";
      LOG.error(msg);
      throw new RuntimeException(msg);
    }
    for (Path p : inputDirs) {
      FileInputFormat.addInputPath(job, p);
    }

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setMapperClass(InputCompatMapper.class);
    job.setReducerClass(InputCompatReducer.class);
    job.setJarByClass(SegmentReader.class);

    Path tempDir = new Path(conf.get("hadoop.tmp.dir", "/tmp") + "/segread-"
        + RANDOM.nextInt());
    FileSystem fs = tempDir.getFileSystem(conf);
    fs.delete(tempDir, true);

    FileOutputFormat.setOutputPath(job, tempDir);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NutchWritable.class);

    try {
      boolean success = job.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("SegmentReader", job);
        LOG.error(message);
        throw new RuntimeException(message);
      }
    } catch (IOException | InterruptedException | ClassNotFoundException e ){
      LOG.error(StringUtils.stringifyException(e));
      throw e;
    }

    // concatenate the output
    Path dumpFile = new Path(output, conf.get("segment.dump.dir", "dump"));
    FileSystem outFs = dumpFile.getFileSystem(conf);

    // remove the old file
    outFs.delete(dumpFile, true);
    FileStatus[] fstats = fs.listStatus(tempDir,
        HadoopFSUtil.getPassAllFilter());
    Path[] files = HadoopFSUtil.getPaths(fstats);

    int currentRecordNumber = 0;
    if (files.length > 0) {
      try (PrintWriter writer = new PrintWriter(
          new BufferedWriter(new OutputStreamWriter(outFs.create(dumpFile),
              StandardCharsets.UTF_8)))) {

        for (int i = 0; i < files.length; i++) {
          Path partFile = files[i];
          try {
            currentRecordNumber = append(fs, conf, partFile, writer,
                currentRecordNumber);
          } catch (IOException exception) {
            if (LOG.isWarnEnabled()) {
              LOG.warn("Couldn't copy the content of " + partFile.toString()
                  + " into " + dumpFile.toString());
              LOG.warn(exception.getMessage());
            }
          }
        }
      }
    }
    fs.delete(tempDir, true);
    LOG.info("SegmentReader: done");
  }