public void reduce()

in src/java/org/apache/nutch/crawl/Generator.java [424:560]


    public void reduce(FloatWritable key, Iterable<SelectorEntry> values,
        Context context) throws IOException, InterruptedException {

      String currentHostname = null;
      HostDatum host = null;
      LongWritable variableFetchDelayWritable = null; // in millis
      Text variableFetchDelayKey = new Text("_variableFetchDelay_");
      // local variable maxCount may hold host-specific count set in HostDb
      int maxCount = this.maxCount;
      for (SelectorEntry entry : values) {
        Text url = entry.url;
        String urlString = url.toString();
        URL u = null;

        String hostname = URLUtil.getHost(urlString);
        if (hostname == null) {
          currentHostname = hostname;
          // malformed URLs are counted later on when extracting host or domain
        } else if (!hostname.equals(currentHostname)) {
          currentHostname = hostname;
          host = hostDatumCache.get(hostname);

          // Got it?
          if (host != null) {
            if (maxCountExpr != null) {
              try {
                long variableMaxCount = Math.round((double)maxCountExpr.execute(createContext(host)));
                LOG.debug("Generator: variable maxCount: {} for {}", variableMaxCount, hostname);
                maxCount = (int)variableMaxCount;
              } catch (Exception e) {
                LOG.error("Unable to execute variable maxCount expression because: " + e.getMessage(), e);
              }
            }

            if (fetchDelayExpr != null) {
              try {
                long variableFetchDelay = Math.round((double)fetchDelayExpr.execute(createContext(host)));
                LOG.debug("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname);
                variableFetchDelayWritable = new LongWritable(variableFetchDelay);
              } catch (Exception e) {
                LOG.error("Unable to execute fetch delay expression because: " + e.getMessage(), e);
              }
            }
          }
        }

        // Got a non-zero variable fetch delay? Add it to the datum's metadata
        if (variableFetchDelayWritable != null) {
          entry.datum.getMetaData().put(variableFetchDelayKey,
              variableFetchDelayWritable);
        }

        if (count == limit) {
          // do we have any segments left?
          if (currentsegmentnum < maxNumSegments) {
            count = 0;
            currentsegmentnum++;
          } else
            break;
        }

        String hostordomain = null;

        try {
          if (normalise && normalizers != null) {
            urlString = normalizers.normalize(urlString,
                URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
          }
          u = new URL(urlString);
          if (byDomain) {
            hostordomain = URLUtil.getDomainName(u);
          } else {
            hostordomain = u.getHost();
          }
        } catch (MalformedURLException e) {
          LOG.warn("Malformed URL: '{}', skipping ({})", urlString,
              StringUtils.stringifyException(e));
          context.getCounter("Generator", "MALFORMED_URL").increment(1);
          continue;
        }

        hostordomain = hostordomain.toLowerCase();

        // only filter if we are counting hosts or domains
        if (maxCount > 0) {
          int[] hostCount = hostCounts.get(hostordomain);
          if (hostCount == null) {
            hostCount = new int[] { 1, 0 };
            hostCounts.put(hostordomain, hostCount);
          }

          // increment hostCount
          hostCount[1]++;

          // check if topN reached, select next segment if it is
          while (segCounts[hostCount[0] - 1] >= limit
              && hostCount[0] < maxNumSegments) {
            hostCount[0]++;
            hostCount[1] = 0;
          }

          // reached the limit of allowed URLs per host / domain
          // see if we can put it in the next segment?
          if (hostCount[1] > maxCount) {
            if (hostCount[0] < maxNumSegments) {
              hostCount[0]++;
              hostCount[1] = 1;
            } else {
              if (hostCount[1] == (maxCount+1)) {
                context
                    .getCounter("Generator", "HOSTS_AFFECTED_PER_HOST_OVERFLOW")
                    .increment(1);
                LOG.info(
                    "Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.",
                    hostordomain, maxCount, maxNumSegments);
              }
              // skip this entry
              context.getCounter("Generator", "URLS_SKIPPED_PER_HOST_OVERFLOW")
                  .increment(1);
              continue;
            }
          }
          entry.segnum = new IntWritable(hostCount[0]);
          segCounts[hostCount[0] - 1]++;
        } else {
          entry.segnum = new IntWritable(currentsegmentnum);
          segCounts[currentsegmentnum - 1]++;
        }

        outputFile = generateFileName(entry);
        mos.write("sequenceFiles", key, entry, outputFile);

        // Count is incremented only when we keep the URL
        // maxCount may cause us to skip it.
        count++;
      }
    }