private TreeMap processStatJobHelper()

in src/java/org/apache/nutch/crawl/CrawlDbReader.java [562:698]


  private TreeMap<String, Writable> processStatJobHelper(String crawlDb,
      Configuration config, boolean sort)
      throws IOException, InterruptedException, ClassNotFoundException {
    Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis());

    Job job = Job.getInstance(config, "Nutch CrawlDbReader: " + crawlDb);
    config = job.getConfiguration();
    config.setBoolean("db.reader.stats.sort", sort);

    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
    job.setInputFormatClass(SequenceFileInputFormat.class);

    job.setJarByClass(CrawlDbReader.class);
    job.setMapperClass(CrawlDbStatMapper.class);
    job.setCombinerClass(CrawlDbStatReducer.class);
    job.setReducerClass(CrawlDbStatReducer.class);

    FileOutputFormat.setOutputPath(job, tmpFolder);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NutchWritable.class);

    // https://issues.apache.org/jira/browse/NUTCH-1029
    config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
        false);
    FileSystem fileSystem = tmpFolder.getFileSystem(config);
    try {
      boolean success = job.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("CrawlDbReader", job);
        LOG.error(message);
        fileSystem.delete(tmpFolder, true);
        throw new RuntimeException(message);
      }
    } catch (IOException | InterruptedException | ClassNotFoundException e) {
      LOG.error(StringUtils.stringifyException(e));
      fileSystem.delete(tmpFolder, true);
      throw e;
    }

    // reading the result
    SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(tmpFolder,
        config);

    Text key = new Text();
    NutchWritable value = new NutchWritable();

    TreeMap<String, Writable> stats = new TreeMap<>();
    for (int i = 0; i < readers.length; i++) {
      SequenceFile.Reader reader = readers[i];
      while (reader.next(key, value)) {
        String k = key.toString();
        Writable val = stats.get(k);
        if (val == null) {
          stats.put(k, value.get());
          continue;
        }
        if (k.equals("sc")) {
          float min = Float.MAX_VALUE;
          float max = Float.MIN_VALUE;
          if (stats.containsKey("scn")) {
            min = ((FloatWritable) stats.get("scn")).get();
          } else {
            min = ((FloatWritable) stats.get("sc")).get();
          }
          if (stats.containsKey("scx")) {
            max = ((FloatWritable) stats.get("scx")).get();
          } else {
            max = ((FloatWritable) stats.get("sc")).get();
          }
          float fvalue = ((FloatWritable) value.get()).get();
          if (min > fvalue) {
            min = fvalue;
          }
          if (max < fvalue) {
            max = fvalue;
          }
          stats.put("scn", new FloatWritable(min));
          stats.put("scx", new FloatWritable(max));
        } else if (k.equals("ft") || k.equals("fi")) {
          long min = Long.MAX_VALUE;
          long max = Long.MIN_VALUE;
          String minKey = k + "n";
          String maxKey = k + "x";
          if (stats.containsKey(minKey)) {
            min = ((LongWritable) stats.get(minKey)).get();
          } else if (stats.containsKey(k)) {
            min = ((LongWritable) stats.get(k)).get();
          }
          if (stats.containsKey(maxKey)) {
            max = ((LongWritable) stats.get(maxKey)).get();
          } else if (stats.containsKey(k)) {
            max = ((LongWritable) stats.get(k)).get();
          }
          long lvalue = ((LongWritable) value.get()).get();
          if (min > lvalue) {
            min = lvalue;
          }
          if (max < lvalue) {
            max = lvalue;
          }
          stats.put(k + "n", new LongWritable(min));
          stats.put(k + "x", new LongWritable(max));
        } else if (k.equals("sct")) {
          FloatWritable fvalue = (FloatWritable) value.get();
          ((FloatWritable) val).set(((FloatWritable) val).get() + fvalue.get());
        } else if (k.equals("scd")) {
          MergingDigest tdigest = null;
          MergingDigest tdig = MergingDigest.fromBytes(
              ByteBuffer.wrap(((BytesWritable) value.get()).getBytes()));
          if (val instanceof BytesWritable) {
            tdigest = MergingDigest
                .fromBytes(ByteBuffer.wrap(((BytesWritable) val).getBytes()));
            tdigest.add(tdig);
          } else {
            tdigest = tdig;
          }
          ByteBuffer tdigestBytes = ByteBuffer
              .allocate(tdigest.smallByteSize());
          tdigest.asSmallBytes(tdigestBytes);
          stats.put(k, new BytesWritable(tdigestBytes.array()));
        } else {
          LongWritable lvalue = (LongWritable) value.get();
          ((LongWritable) val).set(((LongWritable) val).get() + lvalue.get());
        }
      }
      reader.close();
    }
    // remove score, fetch interval, and fetch time
    // (used for min/max calculation)
    stats.remove("sc");
    stats.remove("fi");
    stats.remove("ft");
    // removing the tmp folder
    fileSystem.delete(tmpFolder, true);
    return stats;
  }