in src/java/org/apache/nutch/indexer/IndexerMapReduce.java [244:431]
public void reduce(Text key, Iterable<NutchWritable> values,
Context context) throws IOException, InterruptedException {
Inlinks inlinks = null;
CrawlDatum dbDatum = null;
CrawlDatum fetchDatum = null;
Content content = null;
ParseData parseData = null;
ParseText parseText = null;
for (NutchWritable val : values) {
final Writable value = val.get(); // unwrap
if (value instanceof Inlinks) {
inlinks = (Inlinks) value;
} else if (value instanceof CrawlDatum) {
final CrawlDatum datum = (CrawlDatum) value;
if (CrawlDatum.hasDbStatus(datum)) {
dbDatum = datum;
} else if (CrawlDatum.hasFetchStatus(datum)) {
// don't index unmodified (empty) pages
if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
fetchDatum = datum;
}
} else if (CrawlDatum.STATUS_LINKED == datum.getStatus()
|| CrawlDatum.STATUS_SIGNATURE == datum.getStatus()
|| CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
continue;
} else {
throw new RuntimeException("Unexpected status: " + datum.getStatus());
}
} else if (value instanceof ParseData) {
parseData = (ParseData) value;
// Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434
if (deleteRobotsNoIndex) {
// Get the robots meta data
String robotsMeta = parseData.getMeta(Nutch.ROBOTS_METATAG);
// Has it a noindex for this url?
if (robotsMeta != null && robotsMeta.toLowerCase(Locale.ROOT)
.indexOf("noindex") != -1) {
// Delete it!
context.write(key, DELETE_ACTION);
context.getCounter("IndexerStatus", "deleted (robots=noindex)").increment(1);
return;
}
}
} else if (value instanceof ParseText) {
parseText = (ParseText) value;
} else if (value instanceof Content) {
content = (Content)value;
} else {
LOG.warn("Unrecognized type: {}", value.getClass());
}
}
// Whether to delete GONE or REDIRECTS
if (delete && fetchDatum != null) {
if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE
|| dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
context.getCounter("IndexerStatus", "deleted (gone)").increment(1);
context.write(key, DELETE_ACTION);
return;
}
if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
|| fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
|| dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM
|| dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
context.getCounter("IndexerStatus", "deleted (redirects)").increment(1);
context.write(key, DELETE_ACTION);
return;
}
}
if (fetchDatum == null || parseText == null || parseData == null) {
return; // only have inlinks
}
// Whether to delete pages marked as duplicates
if (delete && dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
context.getCounter("IndexerStatus", "deleted (duplicates)").increment(1);
context.write(key, DELETE_ACTION);
return;
}
// Whether to skip DB_NOTMODIFIED pages
if (skip && dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
context.getCounter("IndexerStatus", "skipped (not modified)").increment(1);
return;
}
if (!parseData.getStatus().isSuccess()
|| fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
return;
}
NutchDocument doc = new NutchDocument();
doc.add("id", key.toString());
final Metadata metadata = parseData.getContentMeta();
// add segment, used to map from merged index back to segment files
doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
// add digest, used by dedup
doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
final Parse parse = new ParseImpl(parseText, parseData);
float boost = 1.0f;
// run scoring filters
try {
boost = scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
inlinks, boost);
} catch (final ScoringFilterException e) {
context.getCounter("IndexerStatus", "errors (ScoringFilter)").increment(1);
LOG.warn("Error calculating score {}: {}", key, e);
return;
}
// apply boost to all indexed fields.
doc.setWeight(boost);
// store boost for use by explain and dedup
doc.add("boost", Float.toString(boost));
try {
if (dbDatum != null) {
// Indexing filters may also be interested in the signature
fetchDatum.setSignature(dbDatum.getSignature());
// extract information from dbDatum and pass it to
// fetchDatum so that indexing filters can use it
final Text url = (Text) dbDatum.getMetaData().get(
Nutch.WRITABLE_REPR_URL_KEY);
if (url != null) {
// Representation URL also needs normalization and filtering.
// If repr URL is excluded by filters we still accept this document
// but represented by its primary URL ("key") which has passed URL
// filters.
String urlString = filterUrl(normalizeUrl(key.toString(), normalize,
urlNormalizers), filter, urlFilters);
if (urlString != null) {
url.set(urlString);
fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
}
}
}
// run indexing filters
doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
} catch (final IndexingException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error indexing " + key + ": ", e);
}
context.getCounter("IndexerStatus", "errors (IndexingFilter)").increment(1);
return;
}
// skip documents discarded by indexing filters
if (doc == null) {
// https://issues.apache.org/jira/browse/NUTCH-1449
if (deleteSkippedByIndexingFilter) {
NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
context.write(key, action);
context.getCounter("IndexerStatus", "deleted (IndexingFilter)").increment(1);
} else {
context.getCounter("IndexerStatus", "skipped (IndexingFilter)").increment(1);
}
return;
}
if (content != null) {
// Add the original binary content
String binary;
if (base64) {
// optionally encode as base64
// Note: we need a form which works with many versions of commons-code (1.4, 1.11 and upwards),
// cf. NUTCH-2706. The following returns a chunked string for commons-coded 1.4:
// binary = Base64.encodeBase64String(content.getContent());
binary = StringUtils.newStringUtf8(Base64.encodeBase64(content.getContent(), false, false));
} else {
binary = new String(content.getContent());
}
doc.add("binaryContent", binary);
}
context.getCounter("IndexerStatus", "indexed (add/update)").increment(1);
NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
context.write(key, action);
}