private ParseStatus output()

in src/java/org/apache/nutch/fetcher/FetcherThread.java [661:889]


  private ParseStatus output(Text key, CrawlDatum datum, Content content,
      ProtocolStatus pstatus, int status, int outlinkDepth) throws InterruptedException{

    datum.setStatus(status);
    datum.setFetchTime(System.currentTimeMillis());
    if (pstatus != null)
      datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);

    ParseResult parseResult = null;
    if (content != null) {
      Metadata metadata = content.getMetadata();

      // store the guessed content type in the crawldatum
      if (content.getContentType() != null)
        datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
            new Text(content.getContentType()));

      // add segment to metadata
      metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
      // add score to content metadata so that ParseSegment can pick it up.
      try {
        scfilters.passScoreBeforeParsing(key, datum, content);
      } catch (Exception e) {
        if (LOG.isWarnEnabled()) {
          LOG.warn("{} {} Couldn't pass score, url {} ({})", getName(),
              Thread.currentThread().getId(), key, e);
        }
      }

      if (status == CrawlDatum.STATUS_FETCH_SUCCESS) {
        if (parsing && !(skipTruncated && ParseSegment.isTruncated(content))) {
          try {
            parseResult = this.parseUtil.parse(content);
          } catch (Exception e) {
            LOG.warn("{} {} Error parsing: {}: {}", getName(),
                Thread.currentThread().getId(), key,
                StringUtils.stringifyException(e));
          }
        }

        if (parseResult == null && (parsing || signatureWithoutParsing)) {
          byte[] signature = SignatureFactory.getSignature(conf)
              .calculate(content, new ParseStatus().getEmptyParse(conf));
          datum.setSignature(signature);
        }
      }

      /*
       * Store status code in content So we can read this value during parsing
       * (as a separate job) and decide to parse or not.
       */
      content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
          Integer.toString(status));
    }

    try {
      context.write(key, new NutchWritable(datum));
      if (content != null && storingContent)
        context.write(key, new NutchWritable(content));
      if (parseResult != null) {
        for (Entry<Text, Parse> entry : parseResult) {
          Text url = entry.getKey();
          Parse parse = entry.getValue();
          ParseStatus parseStatus = parse.getData().getStatus();
          ParseData parseData = parse.getData();

          if (!parseStatus.isSuccess()) {
            LOG.warn("{} {} Error parsing: {}: {}", getName(),
                Thread.currentThread().getId(), key, parseStatus);
            parse = parseStatus.getEmptyParse(conf);
          }

          // Calculate page signature. For non-parsing fetchers this will
          // be done in ParseSegment
          byte[] signature = SignatureFactory.getSignature(conf)
              .calculate(content, parse);
          // Ensure segment name and score are in parseData metadata
          parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
          parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
              StringUtil.toHexString(signature));
          // Pass fetch time to content meta
          parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
              Long.toString(datum.getFetchTime()));
          if (url.equals(key))
            datum.setSignature(signature);
          try {
            scfilters.passScoreAfterParsing(url, content, parse);
          } catch (Exception e) {
            if (LOG.isWarnEnabled()) {
              LOG.warn("{} {} Couldn't pass score, url {} ({})", getName(),
                  Thread.currentThread().getId(), key, e);
            }
          }

          String origin = null;

          // collect outlinks for subsequent db update
          Outlink[] links = parseData.getOutlinks();
          int outlinksToStore = Math.min(maxOutlinks, links.length);
          if (ignoreExternalLinks || ignoreInternalLinks) {
            URL originURL = new URL(url.toString());
            // based on domain?
            if ("bydomain".equalsIgnoreCase(ignoreExternalLinksMode)) {
              origin = URLUtil.getDomainName(originURL).toLowerCase();
            } 
            // use host 
            else {
              origin = originURL.getHost().toLowerCase();
            }
          }
          
          //used by fetchNode         
          if(fetchNode!=null){
            fetchNode.setOutlinks(links);
            fetchNode.setTitle(parseData.getTitle());
            FetchNodeDb.getInstance().put(fetchNode.getUrl().toString(), fetchNode);
          }
          int validCount = 0;

          // Process all outlinks, normalize, filter and deduplicate
          List<Outlink> outlinkList = new ArrayList<>(outlinksToStore);
          HashSet<String> outlinks = new HashSet<>(outlinksToStore);
          for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {
            String toUrl = links[i].getToUrl();

            if (toUrl.length() > maxOutlinkLength) {
              continue;
            }
            toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
                origin, ignoreInternalLinks, ignoreExternalLinks,
                ignoreExternalLinksMode, urlFiltersForOutlinks,
                urlExemptionFilters, normalizersForOutlinks);
            if (toUrl == null) {
              continue;
            }

            validCount++;
            links[i].setUrl(toUrl);
            outlinkList.add(links[i]);
            outlinks.add(toUrl);
          }
          
          //Publish fetch report event 
          if(activatePublisher) {
            FetcherThreadEvent reportEvent = new FetcherThreadEvent(PublishEventType.REPORT, url.toString());
            reportEvent.addOutlinksToEventData(outlinkList);
            reportEvent.addEventData(Nutch.FETCH_EVENT_TITLE, parseData.getTitle());
            reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTTYPE, parseData.getContentMeta().get("content-type"));
            reportEvent.addEventData(Nutch.FETCH_EVENT_SCORE, datum.getScore());
            reportEvent.addEventData(Nutch.FETCH_EVENT_FETCHTIME, datum.getFetchTime());
            reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTLANG, parseData.getContentMeta().get("content-language"));
            publisher.publish(reportEvent, conf);
          }

          // Only process depth N outlinks
          if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth
              && !fetchQueues.timelimitExceeded()) {
            FetchItem ft = FetchItem.create(url, null, queueMode);
            FetchItemQueue queue = fetchQueues.getFetchItemQueue(ft.queueID);
            queue.alreadyFetched.add(url.toString().hashCode());

            context.getCounter("FetcherOutlinks", "outlinks_detected").increment(
                outlinks.size());

            // Counter to limit num outlinks to follow per page
            int outlinkCounter = 0;

            String followUrl;

            // Walk over the outlinks and add as new FetchItem to the queues
            Iterator<String> iter = outlinks.iterator();
            while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
              followUrl = iter.next();

              // Check whether we'll follow external outlinks
              if (outlinksIgnoreExternal) {
                if (!URLUtil.getHost(url.toString()).equals(
                    URLUtil.getHost(followUrl))) {
                  continue;
                }
              }

              // Already followed?
              int urlHashCode = followUrl.hashCode();
              if (queue.alreadyFetched.contains(urlHashCode)) {
                continue;
              }
              queue.alreadyFetched.add(urlHashCode);
              
              // Create new FetchItem with depth incremented
              FetchItem fit = FetchItem.create(new Text(followUrl),
                  new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
                  queueMode, outlinkDepth + 1);
              
              context.getCounter("FetcherOutlinks", "outlinks_following").increment(1);    
              
              fetchQueues.addFetchItem(fit);

              outlinkCounter++;
            }
          }

          // Overwrite the outlinks in ParseData with the normalized and
          // filtered set
          parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
              .size()]));

          context.write(url, new NutchWritable(new ParseImpl(new ParseText(
              parse.getText()), parseData, parse.isCanonical())));
        }
      }
    } catch (IOException e) {
      if (LOG.isErrorEnabled()) {
        LOG.error("fetcher caught:", e);
      }
    }

    // return parse status (of the "original" URL if the ParseResult contains
    // multiple parses) which allows Fetcher to follow meta-redirects
    if (parseResult != null && !parseResult.isEmpty()) {
      Parse p = parseResult.get(content.getUrl());
      if (p != null) {
        context.getCounter("ParserStatus", ParseStatus.majorCodes[p
            .getData().getStatus().getMajorCode()]).increment(1);
        return p.getData().getStatus();
      }
    }
    return null;
  }