in src/java/org/apache/nutch/crawl/CrawlDbReader.java [562:698]
private TreeMap<String, Writable> processStatJobHelper(String crawlDb,
Configuration config, boolean sort)
throws IOException, InterruptedException, ClassNotFoundException {
Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis());
Job job = Job.getInstance(config, "Nutch CrawlDbReader: " + crawlDb);
config = job.getConfiguration();
config.setBoolean("db.reader.stats.sort", sort);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setJarByClass(CrawlDbReader.class);
job.setMapperClass(CrawlDbStatMapper.class);
job.setCombinerClass(CrawlDbStatReducer.class);
job.setReducerClass(CrawlDbStatReducer.class);
FileOutputFormat.setOutputPath(job, tmpFolder);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
// https://issues.apache.org/jira/browse/NUTCH-1029
config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
FileSystem fileSystem = tmpFolder.getFileSystem(config);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = NutchJob.getJobFailureLogMessage("CrawlDbReader", job);
LOG.error(message);
fileSystem.delete(tmpFolder, true);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
fileSystem.delete(tmpFolder, true);
throw e;
}
// reading the result
SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(tmpFolder,
config);
Text key = new Text();
NutchWritable value = new NutchWritable();
TreeMap<String, Writable> stats = new TreeMap<>();
for (int i = 0; i < readers.length; i++) {
SequenceFile.Reader reader = readers[i];
while (reader.next(key, value)) {
String k = key.toString();
Writable val = stats.get(k);
if (val == null) {
stats.put(k, value.get());
continue;
}
if (k.equals("sc")) {
float min = Float.MAX_VALUE;
float max = Float.MIN_VALUE;
if (stats.containsKey("scn")) {
min = ((FloatWritable) stats.get("scn")).get();
} else {
min = ((FloatWritable) stats.get("sc")).get();
}
if (stats.containsKey("scx")) {
max = ((FloatWritable) stats.get("scx")).get();
} else {
max = ((FloatWritable) stats.get("sc")).get();
}
float fvalue = ((FloatWritable) value.get()).get();
if (min > fvalue) {
min = fvalue;
}
if (max < fvalue) {
max = fvalue;
}
stats.put("scn", new FloatWritable(min));
stats.put("scx", new FloatWritable(max));
} else if (k.equals("ft") || k.equals("fi")) {
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
String minKey = k + "n";
String maxKey = k + "x";
if (stats.containsKey(minKey)) {
min = ((LongWritable) stats.get(minKey)).get();
} else if (stats.containsKey(k)) {
min = ((LongWritable) stats.get(k)).get();
}
if (stats.containsKey(maxKey)) {
max = ((LongWritable) stats.get(maxKey)).get();
} else if (stats.containsKey(k)) {
max = ((LongWritable) stats.get(k)).get();
}
long lvalue = ((LongWritable) value.get()).get();
if (min > lvalue) {
min = lvalue;
}
if (max < lvalue) {
max = lvalue;
}
stats.put(k + "n", new LongWritable(min));
stats.put(k + "x", new LongWritable(max));
} else if (k.equals("sct")) {
FloatWritable fvalue = (FloatWritable) value.get();
((FloatWritable) val).set(((FloatWritable) val).get() + fvalue.get());
} else if (k.equals("scd")) {
MergingDigest tdigest = null;
MergingDigest tdig = MergingDigest.fromBytes(
ByteBuffer.wrap(((BytesWritable) value.get()).getBytes()));
if (val instanceof BytesWritable) {
tdigest = MergingDigest
.fromBytes(ByteBuffer.wrap(((BytesWritable) val).getBytes()));
tdigest.add(tdig);
} else {
tdigest = tdig;
}
ByteBuffer tdigestBytes = ByteBuffer
.allocate(tdigest.smallByteSize());
tdigest.asSmallBytes(tdigestBytes);
stats.put(k, new BytesWritable(tdigestBytes.array()));
} else {
LongWritable lvalue = (LongWritable) value.get();
((LongWritable) val).set(((LongWritable) val).get() + lvalue.get());
}
}
reader.close();
}
// remove score, fetch interval, and fetch time
// (used for min/max calculation)
stats.remove("sc");
stats.remove("fi");
stats.remove("ft");
// removing the tmp folder
fileSystem.delete(tmpFolder, true);
return stats;
}