in src/java/org/apache/nutch/crawl/CrawlDbReader.java [431:506]
public void reduce(Text key, Iterable<NutchWritable> values,
Context context) throws IOException, InterruptedException {
String k = key.toString();
if (k.equals("T") || k.startsWith("status") || k.startsWith("retry")
|| k.equals("ftt") || k.equals("fit")) {
// sum all values for this key
long sum = 0;
for (NutchWritable value : values) {
sum += ((LongWritable) value.get()).get();
}
// output sum
context.write(key, new NutchWritable(new LongWritable(sum)));
} else if (k.equals("sc")) {
float min = Float.MAX_VALUE;
float max = Float.MIN_VALUE;
for (NutchWritable nvalue : values) {
float value = ((FloatWritable) nvalue.get()).get();
if (max < value) {
max = value;
}
if (min > value) {
min = value;
}
}
context.write(key, new NutchWritable(new FloatWritable(min)));
context.write(key, new NutchWritable(new FloatWritable(max)));
} else if (k.equals("ft") || k.equals("fi")) {
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (NutchWritable nvalue : values) {
long value = ((LongWritable) nvalue.get()).get();
if (max < value) {
max = value;
}
if (min > value) {
min = value;
}
}
context.write(key, new NutchWritable(new LongWritable(min)));
context.write(key, new NutchWritable(new LongWritable(max)));
} else if (k.equals("sct")) {
float cnt = 0.0f;
for (NutchWritable nvalue : values) {
float value = ((FloatWritable) nvalue.get()).get();
cnt += value;
}
context.write(key, new NutchWritable(new FloatWritable(cnt)));
} else if (k.equals("scd")) {
MergingDigest tdigest = null;
for (NutchWritable nvalue : values) {
Writable value = nvalue.get();
if (value instanceof BytesWritable) {
byte[] bytes = ((BytesWritable) value).getBytes();
MergingDigest tdig = MergingDigest
.fromBytes(ByteBuffer.wrap(bytes));
if (tdigest == null) {
tdigest = tdig;
} else {
tdigest.add(tdig);
}
} else if (value instanceof FloatWritable) {
float val = ((FloatWritable) value).get();
if (!Float.isNaN(val)) {
if (tdigest == null) {
tdigest = (MergingDigest) TDigest.createMergingDigest(100.0);
}
tdigest.add(val);
}
}
}
ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
tdigest.asSmallBytes(tdigestBytes);
context.write(key,
new NutchWritable(new BytesWritable(tdigestBytes.array())));
}
}