public void reduce()

in src/java/org/apache/nutch/crawl/CrawlDbReader.java [431:506]


    public void reduce(Text key, Iterable<NutchWritable> values,
        Context context) throws IOException, InterruptedException {
      String k = key.toString();
      if (k.equals("T") || k.startsWith("status") || k.startsWith("retry")
          || k.equals("ftt") || k.equals("fit")) {
        // sum all values for this key
        long sum = 0;
        for (NutchWritable value : values) {
          sum += ((LongWritable) value.get()).get();
        }
        // output sum
        context.write(key, new NutchWritable(new LongWritable(sum)));
      } else if (k.equals("sc")) {
        float min = Float.MAX_VALUE;
        float max = Float.MIN_VALUE;
        for (NutchWritable nvalue : values) {
          float value = ((FloatWritable) nvalue.get()).get();
          if (max < value) {
            max = value;
          }
          if (min > value) {
            min = value;
          }
        }
        context.write(key, new NutchWritable(new FloatWritable(min)));
        context.write(key, new NutchWritable(new FloatWritable(max)));
      } else if (k.equals("ft") || k.equals("fi")) {
        long min = Long.MAX_VALUE;
        long max = Long.MIN_VALUE;
        for (NutchWritable nvalue : values) {
          long value = ((LongWritable) nvalue.get()).get();
          if (max < value) {
            max = value;
          }
          if (min > value) {
            min = value;
          }
        }
        context.write(key, new NutchWritable(new LongWritable(min)));
        context.write(key, new NutchWritable(new LongWritable(max)));
      } else if (k.equals("sct")) {
        float cnt = 0.0f;
        for (NutchWritable nvalue : values) {
          float value = ((FloatWritable) nvalue.get()).get();
          cnt += value;
        }
        context.write(key, new NutchWritable(new FloatWritable(cnt)));
      } else if (k.equals("scd")) {
        MergingDigest tdigest = null;
        for (NutchWritable nvalue : values) {
          Writable value = nvalue.get();
          if (value instanceof BytesWritable) {
            byte[] bytes = ((BytesWritable) value).getBytes();
            MergingDigest tdig = MergingDigest
                .fromBytes(ByteBuffer.wrap(bytes));
            if (tdigest == null) {
              tdigest = tdig;
            } else {
              tdigest.add(tdig);
            }
          } else if (value instanceof FloatWritable) {
            float val = ((FloatWritable) value).get();
            if (!Float.isNaN(val)) {
              if (tdigest == null) {
                tdigest = (MergingDigest) TDigest.createMergingDigest(100.0);
              }
              tdigest.add(val);
            }
          }
        }
        ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
        tdigest.asSmallBytes(tdigestBytes);
        context.write(key,
            new NutchWritable(new BytesWritable(tdigestBytes.array())));
      }
    }