public void reduce()

in src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java [154:417]


  public void reduce(Text key, Iterable<NutchWritable> values,
    Context context) throws IOException, InterruptedException {

    Map<String,Map<String,Long>> stringCounts = new HashMap<>();
    Map<String,Float> maximums = new HashMap<>();
    Map<String,Float> sums = new HashMap<>(); // used to calc averages
    Map<String,Long> counts = new HashMap<>(); // used to calc averages
    Map<String,Float> minimums = new HashMap<>();
    Map<String,TDigest> tdigests = new HashMap<String,TDigest>();
    
    HostDatum hostDatum = new HostDatum();
    float score = 0;
    
    if (stringFields != null) {
      for (int i = 0; i < stringFields.length; i++) {
        stringCounts.put(stringFields[i], new HashMap<>());
      }
    }
    
    // Loop through all values until we find a non-empty HostDatum or use
    // an empty if this is a new host for the host db
    for (NutchWritable val : values) {
      final Writable value = val.get(); // unwrap
      
      // Count crawl datum status's and collect metadata from fields
      if (value instanceof CrawlDatum) {
        CrawlDatum buffer = (CrawlDatum)value;
        
        // Set the correct status field
        switch (buffer.getStatus()) {
          case CrawlDatum.STATUS_DB_UNFETCHED:
            hostDatum.setUnfetched(hostDatum.getUnfetched() + 1l);
            break;

          case CrawlDatum.STATUS_DB_FETCHED:
            hostDatum.setFetched(hostDatum.getFetched() + 1l);
            break;

          case CrawlDatum.STATUS_DB_GONE:
            hostDatum.setGone(hostDatum.getGone() + 1l);
            break;

          case CrawlDatum.STATUS_DB_REDIR_TEMP:
            hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1l);
            break;

          case CrawlDatum.STATUS_DB_REDIR_PERM:
            hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1l);
            break;

          case CrawlDatum.STATUS_DB_NOTMODIFIED:
            hostDatum.setNotModified(hostDatum.getNotModified() + 1l);
            break;
        }
        
        // Record connection failures
        if (buffer.getRetriesSinceFetch() != 0) {
          hostDatum.incConnectionFailures();
        }
        
        // Only gather metadata statistics for proper fetched pages
        if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {            
          // Deal with the string fields
          if (stringFields != null) {
            for (int i = 0; i < stringFields.length; i++) {
              // Does this field exist?
              if (buffer.getMetaData().get(stringFieldWritables[i]) != null) {
                // Get it!
                String metadataValue = null;
                try {
                  metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString();
                } catch (Exception e) {
                  LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value");
                }
              
                // Does the value exist?
                if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) {
                  // Yes, increment it
                  stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1l);
                } else {
                  // Create it!
                  stringCounts.get(stringFields[i]).put(metadataValue, 1l);
                }
              }
            }
          }
          
          // Deal with the numeric fields
          if (numericFields != null) {
            for (int i = 0; i < numericFields.length; i++) {
              // Does this field exist?
              if (buffer.getMetaData().get(numericFieldWritables[i]) != null) {
                try {
                  // Get it!
                  Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString());
                  
                  // Does the median value exist?
                  if (tdigests.containsKey(numericFields[i])) {
                    tdigests.get(numericFields[i]).add(metadataValue);
                  } else {
                    // Create it!
                    TDigest tdigest = TDigest.createDigest(100);
                    tdigest.add((double)metadataValue);
                    tdigests.put(numericFields[i], tdigest);
                  }
                
                  // Does the minimum value exist?
                  if (minimums.containsKey(numericFields[i])) {
                    // Write if this is lower than existing value
                    if (metadataValue < minimums.get(numericFields[i])) {
                      minimums.put(numericFields[i], metadataValue);
                    }
                  } else {
                    // Create it!
                    minimums.put(numericFields[i], metadataValue);
                  }
                  
                  // Does the maximum value exist?
                  if (maximums.containsKey(numericFields[i])) {
                    // Write if this is lower than existing value
                    if (metadataValue > maximums.get(numericFields[i])) {
                      maximums.put(numericFields[i], metadataValue);
                    }
                  } else {
                    // Create it!
                    maximums.put(numericFields[i], metadataValue);
                  }
                  
                  // Sum it up!
                  if (sums.containsKey(numericFields[i])) {
                    // Increment
                    sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue);
                    counts.put(numericFields[i], counts.get(numericFields[i]) + 1l);
                  } else {
                    // Create it!
                    sums.put(numericFields[i], metadataValue);
                    counts.put(numericFields[i], 1l);
                  }
                } catch (Exception e) {
                  LOG.error("{} when processing values for {}", e.getMessage(), key);
                }
              }
            }
          }
        }

        // Run count phase for optional custom crawldatum processors
        if (crawlDatumProcessors != null) {
          for (CrawlDatumProcessor processor : crawlDatumProcessors) {
            processor.count(buffer);
          }
        }
      }

      // 
      else if (value instanceof HostDatum) {
        HostDatum buffer = (HostDatum)value;

        // Check homepage URL
        if (buffer.hasHomepageUrl()) {
          hostDatum.setHomepageUrl(buffer.getHomepageUrl());
        }

        // Check lastCheck timestamp
        if (!buffer.isEmpty()) {
          hostDatum.setLastCheck(buffer.getLastCheck());
        }

        // Check and set DNS failures
        if (buffer.getDnsFailures() > 0) {
          hostDatum.setDnsFailures(buffer.getDnsFailures());
        }

        // Check and set connection failures
        if (buffer.getConnectionFailures() > 0) {
          hostDatum.setConnectionFailures(buffer.getConnectionFailures());
        }
        
        // Check metadata
        if (buffer.hasMetaData()) {
          hostDatum.setMetaData(buffer.getMetaData());
        }

        // Check and set score (score from Web Graph has precedence)
        if (buffer.getScore() > 0) {
          hostDatum.setScore(buffer.getScore());
        }
      }

      // Check for the score
      else if (value instanceof FloatWritable) {
        FloatWritable buffer = (FloatWritable)value;
        score = buffer.get();
      } else {
        LOG.error("Class {} not handled", value.getClass());
      }
    }

    // Check if score was set from Web Graph
    if (score > 0) {
      hostDatum.setScore(score);
    }
    
    // Set metadata
    for (Map.Entry<String, Map<String,Long>> entry : stringCounts.entrySet()) {
      for (Map.Entry<String,Long> subEntry : entry.getValue().entrySet()) {
        hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new LongWritable(subEntry.getValue()));
      }
    }
    for (Map.Entry<String, Float> entry : maximums.entrySet()) {
      hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue()));
    }
    for (Map.Entry<String, Float> entry : sums.entrySet()) {
      hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey())));
    }
    for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) {
      // Emit all percentiles
      for (int i = 0; i < percentiles.length; i++) {
        hostDatum.getMetaData().put(new Text("pct" + Long.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5)));
      }
    }      
    for (Map.Entry<String, Float> entry : minimums.entrySet()) {
      hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue()));
    }
    
    // Impose limits on minimum number of URLs?
    if (urlLimit > -1l) {
      if (hostDatum.numRecords() < urlLimit) {
        context.getCounter("UpdateHostDb", "url_limit_not_reached").increment(1);
        return;
      }
    }
    
    context.getCounter("UpdateHostDb", "total_hosts").increment(1);

    // See if this record is to be checked
    if (shouldCheck(hostDatum)) {
      // Make an entry
      resolverThread = new ResolverThread(key.toString(), hostDatum, context, purgeFailedHostsThreshold);

      // Add the entry to the queue (blocking)
      try {
        queue.put(resolverThread);
      } catch (InterruptedException e) {
        LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
      }

      // Do not progress, the datum will be written in the resolver thread
      return;
    } else if (checkAny) {
      context.getCounter("UpdateHostDb", "skipped_not_eligible").increment(1);
      LOG.debug("UpdateHostDb: {}: skipped_not_eligible", key);
    }

    // Run finalize phase for optional custom crawldatum processors
    if (crawlDatumProcessors != null) {
      for (CrawlDatumProcessor processor : crawlDatumProcessors) {
        processor.finalize(hostDatum);
      }
    }

    // Write the host datum if it wasn't written by the resolver thread
    context.write(key, hostDatum);
  }