public void run()

in src/java/org/apache/nutch/fetcher/QueueFeeder.java [83:175]


  public void run() {
    boolean hasMore = true;
    int cnt = 0;
    int[] queuingStatus = new int[QueuingStatus.values().length];
    while (hasMore) {
      if (queues.timelimitExceeded() || queues.timoutReached) {
        // enough ... lets' simply read all the entries from the input without
        // processing them
        if (queues.timoutReached) {
          int qstatus = QueuingStatus.HIT_BY_TIMEOUT.ordinal();
          if (queuingStatus[qstatus] == 0) {
            LOG.info("QueueFeeder stopping, timeout reached.");
          }
          queuingStatus[qstatus]++;
          context.getCounter("FetcherStatus", "hitByTimeout").increment(1);
        } else {
          int qstatus = QueuingStatus.HIT_BY_TIMELIMIT.ordinal();
          if (queuingStatus[qstatus] == 0) {
            LOG.info("QueueFeeder stopping, timelimit exceeded.");
          }
          queuingStatus[qstatus]++;
          context.getCounter("FetcherStatus", "hitByTimeLimit").increment(1);
        }
        try {
          hasMore = context.nextKeyValue();
        } catch (IOException e) {
          LOG.error("QueueFeeder error reading input, record " + cnt, e);
          return;
        } catch (InterruptedException e) {
          LOG.info("QueueFeeder interrupted, exception:", e);
          return;
        }
        continue;
      }
      int feed = size - queues.getTotalSize();
      if (feed <= 0) {
        // queues are full - spin-wait until they have some free space
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        continue;
      }
      LOG.debug("-feeding {} input urls ...", feed);
      while (feed > 0 && hasMore) {
        try {
          hasMore = context.nextKeyValue();
          if (hasMore) {
            Text url = context.getCurrentKey();
            if (urlFilters != null || urlNormalizers != null) {
              String u = filterNormalize(url.toString());
              if (u == null) {
                // filtered or failed to normalize
                context.getCounter("FetcherStatus", "filtered").increment(1);
                continue;
              }
              url = new Text(u);
            }
            /*
             * Need to copy key and value objects because MapReduce will reuse
             * the original objects while the objects are stored in the queue.
             */
            else {
              url = new Text(url);
            }
            CrawlDatum datum = new CrawlDatum();
            datum.set(context.getCurrentValue());
            QueuingStatus status = queues.addFetchItem(url, datum);
            queuingStatus[status.ordinal()]++;
            if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {
              context
                  .getCounter("FetcherStatus", "AboveExceptionThresholdInQueue")
                  .increment(1);
            }
            cnt++;
            feed--;
          }
        } catch (IOException e) {
          LOG.error("QueueFeeder error reading input, record " + cnt, e);
          return;
        } catch (InterruptedException e) {
          LOG.info("QueueFeeder interrupted, exception:", e);
        }
      }
    }
    // signal queues that no more new fetch items are added
    queues.feederAlive = false;
    LOG.info("QueueFeeder finished: total {} records", cnt);
    LOG.info("QueueFeeder queuing status:");
    for (QueuingStatus status : QueuingStatus.values()) {
      LOG.info("\t{}\t{}", queuingStatus[status.ordinal()], status);
    }
  }