public void run()

in src/java/org/apache/nutch/fetcher/FetcherThread.java [248:533]


  public void run() {
    activeThreads.incrementAndGet(); // count threads

    FetchItem fit = null;
    try {
      // checking for the server to be running and fetcher.parse to be true
      if (parsing && NutchServer.getInstance().isRunning())
        reportToNutchServer = true;
      
      while (true) {
        // creating FetchNode for storing in FetchNodeDb
        if (reportToNutchServer)
          this.fetchNode = new FetchNode();
        else
          this.fetchNode = null;

        // check whether must be stopped
        if (isHalted()) {
          LOG.debug("{} set to halted", getName());
          fit = null;
          return;
        }

        fit = fetchQueues.getFetchItem();
        if (fit == null) {
          if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
            LOG.debug("{} spin-waiting ...", getName());
            // spin-wait.
            spinWaiting.incrementAndGet();
            try {
              Thread.sleep(500);
            } catch (Exception e) {
            }
            spinWaiting.decrementAndGet();
            continue;
          } else {
            // all done, finish this thread
            LOG.info("{} {} has no more work available", getName(),
                Thread.currentThread().getId());
            return;
          }
        }
        lastRequestStart.set(System.currentTimeMillis());

        Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
            Nutch.WRITABLE_REPR_URL_KEY);
        if (reprUrlWritable == null) {
          setReprUrl(fit.url.toString());
        } else {
          setReprUrl(reprUrlWritable.toString());
        }

        try {
          // fetch the page
          redirecting = false;
          redirectCount = 0;
          
          //Publisher event
          if(activatePublisher) {
            FetcherThreadEvent startEvent = new FetcherThreadEvent(PublishEventType.START, fit.getUrl().toString());
            publisher.publish(startEvent, conf);
          }
          
          do {
            if (LOG.isInfoEnabled()) {
              LOG.info("{} {} fetching {} (queue crawl delay={}ms)", getName(),
                  Thread.currentThread().getId(), fit.url,
                  fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay);
            }
            LOG.debug("redirectCount={}", redirectCount);
            redirecting = false;
            Protocol protocol = this.protocolFactory.getProtocol(fit.u);
            BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum,
                robotsTxtContent);
            if (robotsTxtContent != null) {
              outputRobotsTxt(robotsTxtContent);
              robotsTxtContent.clear();
            }
            if (rules.isDeferVisits()) {
              LOG.info("Defer visits for queue {} : {}", fit.queueID, fit.url);
              // retry the fetch item
              if (fetchQueues.timelimitExceeded()) {
                fetchQueues.finishFetchItem(fit, true);
              } else {
                fetchQueues.addFetchItem(fit);
              }
              // but check whether it's time to cancel the queue
              int killedURLs = fetchQueues.checkExceptionThreshold(
                  fit.getQueueID(), this.robotsDeferVisitsRetries + 1,
                  this.robotsDeferVisitsDelay);
              if (killedURLs != 0) {
                context
                    .getCounter("FetcherStatus", "robots_defer_visits_dropped")
                    .increment(killedURLs);
              }
              continue;
            }
            if (!rules.isAllowed(fit.url.toString())) {
              // unblock
              fetchQueues.finishFetchItem(fit, true);
              LOG.info("Denied by robots.txt: {}", fit.url);
              output(fit.url, fit.datum, null,
                  ProtocolStatus.STATUS_ROBOTS_DENIED,
                  CrawlDatum.STATUS_FETCH_GONE);
              context.getCounter("FetcherStatus", "robots_denied").increment(1);
              continue;
            }
            if (rules.getCrawlDelay() > 0) {
              if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
                // unblock
                fetchQueues.finishFetchItem(fit, true);
                LOG.info("Crawl-Delay for {} too long ({} ms), skipping",
                    fit.url, rules.getCrawlDelay());
                output(fit.url, fit.datum, null,
                    ProtocolStatus.STATUS_ROBOTS_DENIED,
                    CrawlDatum.STATUS_FETCH_GONE);
                context.getCounter("FetcherStatus",
                    "robots_denied_maxcrawldelay").increment(1);
                continue;
              } else {
                FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
                long crawlDelay = rules.getCrawlDelay();
                if (crawlDelay < minCrawlDelay) {
                  LOG.info(
                      "Crawl-Delay for {} too short ({} ms), adjusting to {} ms",
                      fit.url, rules.getCrawlDelay(), minCrawlDelay);
                  crawlDelay = minCrawlDelay;
                }
                fiq.crawlDelay = crawlDelay;
                LOG.debug(
                    "Crawl delay for queue: {} is set to {} as per robots.txt. url: ",
                    fit.queueID, fiq.crawlDelay, fit.url);
              }
            }
            ProtocolOutput output = protocol.getProtocolOutput(fit.url,
                fit.datum);
            ProtocolStatus status = output.getStatus();
            Content content = output.getContent();
            ParseStatus pstatus = null;
            // unblock queue
            fetchQueues.finishFetchItem(fit);

            // used for FetchNode
            if (fetchNode != null) {
              fetchNode.setStatus(status.getCode());
              fetchNode.setFetchTime(System.currentTimeMillis());
              fetchNode.setUrl(fit.url);
            }
            
            //Publish fetch finish event
            if(activatePublisher) {
              FetcherThreadEvent endEvent = new FetcherThreadEvent(PublishEventType.END, fit.getUrl().toString());
              endEvent.addEventData("status", status.getName());
              publisher.publish(endEvent, conf);
            }
            context.getCounter("FetcherStatus", status.getName()).increment(1);

            switch (status.getCode()) {

            case ProtocolStatus.SUCCESS: // got a page
              pstatus = output(fit.url, fit.datum, content, status,
                  CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
              updateStatus(content.getContent().length);
              if (pstatus != null && pstatus.isSuccess()
                  && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
                String newUrl = pstatus.getMessage();
                int refreshTime = Integer.parseInt(pstatus.getArgs()[1]);
                Text redirUrl = handleRedirect(fit, newUrl,
                    refreshTime < Fetcher.PERM_REFRESH_TIME,
                    Fetcher.CONTENT_REDIR);
                if (redirUrl != null) {
                  fit = queueRedirect(redirUrl, fit);
                }
              }
              break;

            case ProtocolStatus.MOVED: // redirect
            case ProtocolStatus.TEMP_MOVED:
              int code;
              boolean temp;
              if (status.getCode() == ProtocolStatus.MOVED) {
                code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
                temp = false;
              } else {
                code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
                temp = true;
              }
              output(fit.url, fit.datum, content, status, code);
              String newUrl = status.getMessage();
              Text redirUrl = handleRedirect(fit, newUrl, temp,
                  Fetcher.PROTOCOL_REDIR);
              if (redirUrl != null) {
                fit = queueRedirect(redirUrl, fit);
              } else {
                // stop redirecting
                redirecting = false;
              }
              break;

            case ProtocolStatus.EXCEPTION:
              logError(fit.url, status.getMessage());
              int killedURLs = fetchQueues
                  .checkExceptionThreshold(fit.getQueueID());
              if (killedURLs != 0)
                context.getCounter("FetcherStatus",
                    "AboveExceptionThresholdInQueue").increment(killedURLs);
              /* FALLTHROUGH */

            case ProtocolStatus.RETRY: // retry
              output(fit.url, fit.datum, null, status,
                  CrawlDatum.STATUS_FETCH_RETRY);
              break;

            case ProtocolStatus.GONE: // gone
            case ProtocolStatus.NOTFOUND:
            case ProtocolStatus.ACCESS_DENIED:
            case ProtocolStatus.ROBOTS_DENIED:
              output(fit.url, fit.datum, null, status,
                  CrawlDatum.STATUS_FETCH_GONE);
              break;

            case ProtocolStatus.NOTMODIFIED:
              output(fit.url, fit.datum, null, status,
                  CrawlDatum.STATUS_FETCH_NOTMODIFIED);
              break;

            default:
              if (LOG.isWarnEnabled()) {
                LOG.warn("{} {} Unknown ProtocolStatus: {}", getName(),
                    Thread.currentThread().getId(), status.getCode());
              }
              output(fit.url, fit.datum, null, status,
                  CrawlDatum.STATUS_FETCH_RETRY);
            }

            if (redirecting && redirectCount > maxRedirect) {
              fetchQueues.finishFetchItem(fit);
              context.getCounter("FetcherStatus", "redirect_count_exceeded")
                  .increment(1);
              if (LOG.isInfoEnabled()) {
                LOG.info("{} {} - redirect count exceeded {} ({})", getName(),
                    Thread.currentThread().getId(), fit.url,
                    maxRedirectExceededSkip ? "skipped" : "linked");
              }
              if (maxRedirectExceededSkip) {
                // skip redirect target when redirect count is exceeded
              } else {
                Text newUrl = new Text(status.getMessage());
                CrawlDatum newDatum = createRedirDatum(newUrl, fit,
                    CrawlDatum.STATUS_LINKED);
                output(newUrl, newDatum, null, null, CrawlDatum.STATUS_LINKED);
              }
            }

          } while (redirecting && (redirectCount <= maxRedirect));

        } catch (Throwable t) { // unexpected exception
          // unblock
          fetchQueues.finishFetchItem(fit);
          String message;
          if (LOG.isDebugEnabled()) {
            message = StringUtils.stringifyException(t);
          } else if (logUtil.logShort(t)) {
            message = t.getClass().getName();
          } else {
            message = StringUtils.stringifyException(t);
          }
          logError(fit.url, message);
          output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
              CrawlDatum.STATUS_FETCH_RETRY);
        }
      }

    } catch (Throwable e) {
      if (LOG.isErrorEnabled()) {
        LOG.error("fetcher caught:", e);
      }
    } finally {
      if (fit != null) {
        fetchQueues.finishFetchItem(fit);
      }
      activeThreads.decrementAndGet(); // count threads
      LOG.info("{} {} -finishing thread {}, activeThreads={}", getName(),
          Thread.currentThread().getId(), getName(), activeThreads);
    }
  }