public void reduce()

in src/java/org/apache/nutch/indexer/IndexerMapReduce.java [244:431]


    public void reduce(Text key, Iterable<NutchWritable> values,
        Context context) throws IOException, InterruptedException {
      Inlinks inlinks = null;
      CrawlDatum dbDatum = null;
      CrawlDatum fetchDatum = null;
      Content content = null;
      ParseData parseData = null;
      ParseText parseText = null;

      for (NutchWritable val : values) {
        final Writable value = val.get(); // unwrap
        if (value instanceof Inlinks) {
          inlinks = (Inlinks) value;
        } else if (value instanceof CrawlDatum) {
          final CrawlDatum datum = (CrawlDatum) value;
          if (CrawlDatum.hasDbStatus(datum)) {
            dbDatum = datum;
          } else if (CrawlDatum.hasFetchStatus(datum)) {
            // don't index unmodified (empty) pages
            if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
              fetchDatum = datum;
            }
          } else if (CrawlDatum.STATUS_LINKED == datum.getStatus()
              || CrawlDatum.STATUS_SIGNATURE == datum.getStatus()
              || CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
            continue;
          } else {
            throw new RuntimeException("Unexpected status: " + datum.getStatus());
          }
        } else if (value instanceof ParseData) {
          parseData = (ParseData) value;

          // Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434
          if (deleteRobotsNoIndex) {
            // Get the robots meta data
            String robotsMeta = parseData.getMeta(Nutch.ROBOTS_METATAG);

            // Has it a noindex for this url?
            if (robotsMeta != null && robotsMeta.toLowerCase(Locale.ROOT)
                .indexOf("noindex") != -1) {
              // Delete it!
              context.write(key, DELETE_ACTION);
              context.getCounter("IndexerStatus", "deleted (robots=noindex)").increment(1);
              return;
            }
          }
        } else if (value instanceof ParseText) {
          parseText = (ParseText) value;
        } else if (value instanceof Content) {
          content = (Content)value;
        } else {
          LOG.warn("Unrecognized type: {}", value.getClass());
        }
      }

      // Whether to delete GONE or REDIRECTS
      if (delete && fetchDatum != null) {
        if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE
            || dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
          context.getCounter("IndexerStatus", "deleted (gone)").increment(1);
          context.write(key, DELETE_ACTION);
          return;
        }

        if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
            || fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
            || dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM
            || dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
          context.getCounter("IndexerStatus", "deleted (redirects)").increment(1);
          context.write(key, DELETE_ACTION);
          return;
        }
      }

      if (fetchDatum == null || parseText == null || parseData == null) {
        return; // only have inlinks
      }

      // Whether to delete pages marked as duplicates
      if (delete && dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
        context.getCounter("IndexerStatus", "deleted (duplicates)").increment(1);
        context.write(key, DELETE_ACTION);
        return;
      }

      // Whether to skip DB_NOTMODIFIED pages
      if (skip && dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
        context.getCounter("IndexerStatus", "skipped (not modified)").increment(1);
        return;
      }

      if (!parseData.getStatus().isSuccess()
          || fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
        return;
      }

      NutchDocument doc = new NutchDocument();
      doc.add("id", key.toString());

      final Metadata metadata = parseData.getContentMeta();

      // add segment, used to map from merged index back to segment files
      doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));

      // add digest, used by dedup
      doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
      
      final Parse parse = new ParseImpl(parseText, parseData);
      float boost = 1.0f;
      // run scoring filters
      try {
        boost = scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
            inlinks, boost);
      } catch (final ScoringFilterException e) {
        context.getCounter("IndexerStatus", "errors (ScoringFilter)").increment(1);
        LOG.warn("Error calculating score {}: {}", key, e);
        return;
      }
      // apply boost to all indexed fields.
      doc.setWeight(boost);
      // store boost for use by explain and dedup
      doc.add("boost", Float.toString(boost));

      try {
        if (dbDatum != null) {
          // Indexing filters may also be interested in the signature
          fetchDatum.setSignature(dbDatum.getSignature());
          
          // extract information from dbDatum and pass it to
          // fetchDatum so that indexing filters can use it
          final Text url = (Text) dbDatum.getMetaData().get(
              Nutch.WRITABLE_REPR_URL_KEY);
          if (url != null) {
            // Representation URL also needs normalization and filtering.
            // If repr URL is excluded by filters we still accept this document
            // but represented by its primary URL ("key") which has passed URL
            // filters.
            String urlString = filterUrl(normalizeUrl(key.toString(), normalize,
                                       urlNormalizers), filter, urlFilters);
            if (urlString != null) {
              url.set(urlString);
              fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
            }
          }
        }
        // run indexing filters
        doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
      } catch (final IndexingException e) {
        if (LOG.isWarnEnabled()) {
          LOG.warn("Error indexing " + key + ": ", e);
        }
        context.getCounter("IndexerStatus", "errors (IndexingFilter)").increment(1);
        return;
      }

      // skip documents discarded by indexing filters
      if (doc == null) {
        // https://issues.apache.org/jira/browse/NUTCH-1449
        if (deleteSkippedByIndexingFilter) {
          NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
          context.write(key, action);
          context.getCounter("IndexerStatus", "deleted (IndexingFilter)").increment(1);
        } else {
          context.getCounter("IndexerStatus", "skipped (IndexingFilter)").increment(1);
        }
        return;
      }

      if (content != null) {
        // Add the original binary content
        String binary;
        if (base64) {
          // optionally encode as base64
          // Note: we need a form which works with many versions of commons-code (1.4, 1.11 and upwards),
          // cf. NUTCH-2706.  The following returns a chunked string for commons-coded 1.4:
          //   binary = Base64.encodeBase64String(content.getContent());
          binary = StringUtils.newStringUtf8(Base64.encodeBase64(content.getContent(), false, false));
        } else {
          binary = new String(content.getContent());
        }
        doc.add("binaryContent", binary);
      }

      context.getCounter("IndexerStatus", "indexed (add/update)").increment(1);

      NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
      context.write(key, action);
    }