in src/java/org/apache/nutch/crawl/Generator.java [424:560]
public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
Context context) throws IOException, InterruptedException {
String currentHostname = null;
HostDatum host = null;
LongWritable variableFetchDelayWritable = null; // in millis
Text variableFetchDelayKey = new Text("_variableFetchDelay_");
// local variable maxCount may hold host-specific count set in HostDb
int maxCount = this.maxCount;
for (SelectorEntry entry : values) {
Text url = entry.url;
String urlString = url.toString();
URL u = null;
String hostname = URLUtil.getHost(urlString);
if (hostname == null) {
currentHostname = hostname;
// malformed URLs are counted later on when extracting host or domain
} else if (!hostname.equals(currentHostname)) {
currentHostname = hostname;
host = hostDatumCache.get(hostname);
// Got it?
if (host != null) {
if (maxCountExpr != null) {
try {
long variableMaxCount = Math.round((double)maxCountExpr.execute(createContext(host)));
LOG.debug("Generator: variable maxCount: {} for {}", variableMaxCount, hostname);
maxCount = (int)variableMaxCount;
} catch (Exception e) {
LOG.error("Unable to execute variable maxCount expression because: " + e.getMessage(), e);
}
}
if (fetchDelayExpr != null) {
try {
long variableFetchDelay = Math.round((double)fetchDelayExpr.execute(createContext(host)));
LOG.debug("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname);
variableFetchDelayWritable = new LongWritable(variableFetchDelay);
} catch (Exception e) {
LOG.error("Unable to execute fetch delay expression because: " + e.getMessage(), e);
}
}
}
}
// Got a non-zero variable fetch delay? Add it to the datum's metadata
if (variableFetchDelayWritable != null) {
entry.datum.getMetaData().put(variableFetchDelayKey,
variableFetchDelayWritable);
}
if (count == limit) {
// do we have any segments left?
if (currentsegmentnum < maxNumSegments) {
count = 0;
currentsegmentnum++;
} else
break;
}
String hostordomain = null;
try {
if (normalise && normalizers != null) {
urlString = normalizers.normalize(urlString,
URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
}
u = new URL(urlString);
if (byDomain) {
hostordomain = URLUtil.getDomainName(u);
} else {
hostordomain = u.getHost();
}
} catch (MalformedURLException e) {
LOG.warn("Malformed URL: '{}', skipping ({})", urlString,
StringUtils.stringifyException(e));
context.getCounter("Generator", "MALFORMED_URL").increment(1);
continue;
}
hostordomain = hostordomain.toLowerCase();
// only filter if we are counting hosts or domains
if (maxCount > 0) {
int[] hostCount = hostCounts.get(hostordomain);
if (hostCount == null) {
hostCount = new int[] { 1, 0 };
hostCounts.put(hostordomain, hostCount);
}
// increment hostCount
hostCount[1]++;
// check if topN reached, select next segment if it is
while (segCounts[hostCount[0] - 1] >= limit
&& hostCount[0] < maxNumSegments) {
hostCount[0]++;
hostCount[1] = 0;
}
// reached the limit of allowed URLs per host / domain
// see if we can put it in the next segment?
if (hostCount[1] > maxCount) {
if (hostCount[0] < maxNumSegments) {
hostCount[0]++;
hostCount[1] = 1;
} else {
if (hostCount[1] == (maxCount+1)) {
context
.getCounter("Generator", "HOSTS_AFFECTED_PER_HOST_OVERFLOW")
.increment(1);
LOG.info(
"Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
hostordomain, maxCount, maxNumSegments);
}
// skip this entry
context.getCounter("Generator", "URLS_SKIPPED_PER_HOST_OVERFLOW")
.increment(1);
continue;
}
}
entry.segnum = new IntWritable(hostCount[0]);
segCounts[hostCount[0] - 1]++;
} else {
entry.segnum = new IntWritable(currentsegmentnum);
segCounts[currentsegmentnum - 1]++;
}
outputFile = generateFileName(entry);
mos.write("sequenceFiles", key, entry, outputFile);
// Count is incremented only when we keep the URL
// maxCount may cause us to skip it.
count++;
}
}