public void run()

in core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java [472:791]


        public void run() {
            while (true) {
                FetchItem fit = fetchQueues.getFetchItem();
                if (fit == null) {
                    LOG.trace("{} spin-waiting ...", getName());
                    // spin-wait.
                    spinWaiting.incrementAndGet();
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        LOG.error("{} caught interrupted exception", getName());
                        Thread.currentThread().interrupt();
                    }
                    spinWaiting.decrementAndGet();
                    continue;
                }

                activeThreads.incrementAndGet(); // count threads

                beingFetched[threadNum] = fit.url;

                LOG.debug(
                        "[Fetcher #{}] {}  => activeThreads={}, spinWaiting={}, queueID={}",
                        taskID,
                        getName(),
                        activeThreads,
                        spinWaiting,
                        fit.queueID);

                LOG.debug("[Fetcher #{}] {} : Fetching {}", taskID, getName(), fit.url);

                Metadata metadata = null;

                if (fit.t.contains("metadata")) {
                    metadata = (Metadata) fit.t.getValueByField("metadata");
                }
                if (metadata == null) {
                    metadata = new Metadata();
                }

                // https://github.com/apache/incubator-stormcrawler/issues/813
                metadata.remove("fetch.exception");

                boolean asap = false;

                try {
                    URL url = new URL(fit.url);
                    Protocol protocol = protocolFactory.getProtocol(url);

                    if (protocol == null)
                        throw new RuntimeException(
                                "No protocol implementation found for " + fit.url);

                    BaseRobotRules rules = protocol.getRobotRules(fit.url);
                    boolean fromCache = false;
                    if (rules instanceof RobotRules
                            && ((RobotRules) rules).getContentLengthFetched().length == 0) {
                        fromCache = true;
                        eventCounter.scope("robots.fromCache").incrBy(1);
                    } else {
                        eventCounter.scope("robots.fetched").incrBy(1);
                    }

                    // autodiscovery of sitemaps
                    // the sitemaps will be sent down the topology
                    // if the robot file did not come from the cache
                    // to avoid sending them unnecessarily

                    // check in the metadata if discovery setting has been
                    // overridden

                    String localSitemapDiscoveryVal =
                            metadata.getFirstValue(SITEMAP_DISCOVERY_PARAM_KEY);

                    boolean smautodisco;

                    if ("true".equalsIgnoreCase(localSitemapDiscoveryVal)) {
                        smautodisco = true;
                    } else if ("false".equalsIgnoreCase(localSitemapDiscoveryVal)) {
                        smautodisco = false;
                    } else {
                        smautodisco = sitemapsAutoDiscovery;
                    }

                    if (!fromCache && smautodisco) {
                        for (String sitemapURL : rules.getSitemaps()) {
                            if (rules.isAllowed(sitemapURL)) {
                                emitOutlink(
                                        fit.t,
                                        url,
                                        sitemapURL,
                                        metadata,
                                        SiteMapParserBolt.isSitemapKey,
                                        "true");
                            }
                        }
                    }

                    // has found sitemaps
                    // https://github.com/apache/incubator-stormcrawler/issues/710
                    // note: we don't care if the sitemap URLs where actually
                    // kept
                    boolean foundSitemap = (rules.getSitemaps().size() > 0);
                    metadata.setValue(
                            SiteMapParserBolt.foundSitemapKey, Boolean.toString(foundSitemap));

                    if (!rules.isAllowed(fit.url)) {
                        LOG.info("Denied by robots.txt: {}", fit.url);
                        // pass the info about denied by robots
                        metadata.setValue(Constants.STATUS_ERROR_CAUSE, "robots.txt");
                        collector.emit(
                                org.apache.stormcrawler.Constants.StatusStreamName,
                                fit.t,
                                new Values(fit.url, metadata, Status.ERROR));
                        // no need to wait next time as we won't request from
                        // that site
                        asap = true;
                        continue;
                    }
                    FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID, metadata);
                    if (rules.getCrawlDelay() > 0 && rules.getCrawlDelay() != fiq.crawlDelay) {
                        if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
                            boolean force = false;
                            String msg = "skipping";
                            if (maxCrawlDelayForce) {
                                force = true;
                                msg = "using value of fetcher.max.crawl.delay instead";
                            }
                            LOG.info(
                                    "Crawl-Delay for {} too long ({}), {}",
                                    fit.url,
                                    rules.getCrawlDelay(),
                                    msg);
                            if (force) {
                                fiq.crawlDelay = maxCrawlDelay;
                            } else {
                                // pass the info about crawl delay
                                metadata.setValue(Constants.STATUS_ERROR_CAUSE, "crawl_delay");
                                collector.emit(
                                        org.apache.stormcrawler.Constants.StatusStreamName,
                                        fit.t,
                                        new Values(fit.url, metadata, Status.ERROR));
                                // no need to wait next time as we won't request
                                // from that site
                                asap = true;
                                continue;
                            }
                        } else if (rules.getCrawlDelay() < fetchQueues.crawlDelay
                                && crawlDelayForce) {
                            fiq.crawlDelay = fetchQueues.crawlDelay;
                            LOG.info(
                                    "Crawl delay for {} too short ({}), set to fetcher.server.delay",
                                    fit.url,
                                    rules.getCrawlDelay());
                        } else {
                            fiq.crawlDelay = rules.getCrawlDelay();
                            LOG.info(
                                    "Crawl delay for queue: {}  is set to {} as per robots.txt. url: {}",
                                    fit.queueID,
                                    fiq.crawlDelay,
                                    fit.url);
                        }
                    }

                    long start = System.currentTimeMillis();
                    long timeInQueues = start - fit.creationTime;

                    // been in the queue far too long and already failed
                    // by the timeout - let's not fetch it
                    if (timeoutInQueues != -1 && timeInQueues > timeoutInQueues * 1000) {
                        LOG.info(
                                "[Fetcher #{}] Waited in queue for too long - {}", taskID, fit.url);
                        // no need to wait next time as we won't request from
                        // that site
                        asap = true;
                        continue;
                    }

                    ProtocolResponse response = protocol.getProtocolOutput(fit.url, metadata);

                    long timeFetching = System.currentTimeMillis() - start;

                    final int byteLength = response.getContent().length;

                    // get any metrics from the protocol metadata
                    // expect Longs
                    response.getMetadata().keySet("metrics.").stream()
                            .forEach(
                                    s ->
                                            averagedMetrics
                                                    .scope(s.substring(8))
                                                    .update(
                                                            Long.parseLong(
                                                                    response.getMetadata()
                                                                            .getFirstValue(s))));

                    averagedMetrics.scope("fetch_time").update(timeFetching);
                    averagedMetrics.scope("time_in_queues").update(timeInQueues);
                    averagedMetrics.scope("bytes_fetched").update(byteLength);
                    perSecMetrics.scope("bytes_fetched_perSec").update(byteLength);
                    perSecMetrics.scope("fetched_perSec").update(1);
                    eventCounter.scope("fetched").incrBy(1);
                    eventCounter.scope("bytes_fetched").incrBy(byteLength);

                    LOG.info(
                            "[Fetcher #{}] Fetched {} with status {} in msec {}",
                            taskID,
                            fit.url,
                            response.getStatusCode(),
                            timeFetching);

                    // merges the original MD and the ones returned by the
                    // protocol
                    Metadata mergedMD = new Metadata();
                    mergedMD.putAll(metadata);

                    // add a prefix to avoid confusion, preserve protocol
                    // metadata persisted or transferred from previous fetches
                    mergedMD.putAll(response.getMetadata(), protocolMDprefix);

                    mergedMD.setValue(
                            "fetch.statusCode", Integer.toString(response.getStatusCode()));

                    mergedMD.setValue("fetch.byteLength", Integer.toString(byteLength));

                    mergedMD.setValue("fetch.loadingTime", Long.toString(timeFetching));

                    mergedMD.setValue("fetch.timeInQueues", Long.toString(timeInQueues));

                    // determine the status based on the status code
                    final Status status = Status.fromHTTPCode(response.getStatusCode());

                    eventCounter.scope("status_" + response.getStatusCode()).incrBy(1);

                    final Values tupleToSend = new Values(fit.url, mergedMD, status);

                    // if the status is OK emit on default stream
                    if (status.equals(Status.FETCHED)) {
                        if (response.getStatusCode() == 304) {
                            // mark this URL as fetched so that it gets
                            // rescheduled
                            // but do not try to parse or index
                            collector.emit(Constants.StatusStreamName, fit.t, tupleToSend);
                        } else {
                            // send content for parsing
                            collector.emit(
                                    Utils.DEFAULT_STREAM_ID,
                                    fit.t,
                                    new Values(fit.url, response.getContent(), mergedMD));
                        }
                    } else if (status.equals(Status.REDIRECTION)) {

                        // find the URL it redirects to
                        String redirection =
                                response.getMetadata().getFirstValue(HttpHeaders.LOCATION);

                        // stores the URL it redirects to
                        // used for debugging mainly - do not resolve the target
                        // URL
                        if (StringUtils.isNotBlank(redirection)) {
                            mergedMD.setValue("_redirTo", redirection);
                        }

                        // https://github.com/apache/incubator-stormcrawler/issues/954
                        if (allowRedirs() && StringUtils.isNotBlank(redirection)) {
                            emitOutlink(fit.t, url, redirection, mergedMD);
                        }

                        // mark this URL as redirected
                        collector.emit(Constants.StatusStreamName, fit.t, tupleToSend);
                    }
                    // error
                    else {
                        collector.emit(Constants.StatusStreamName, fit.t, tupleToSend);
                    }

                } catch (Exception exece) {
                    String message = exece.getMessage();
                    if (message == null) message = "";

                    // common exceptions for which we log only a short message
                    if (exece.getCause() instanceof java.util.concurrent.TimeoutException
                            || message.contains(" timed out")) {
                        LOG.info("Socket timeout fetching {}", fit.url);
                        message = "Socket timeout fetching";
                    } else if (exece.getCause() instanceof java.net.UnknownHostException
                            || exece instanceof java.net.UnknownHostException) {
                        LOG.info("Unknown host {}", fit.url);
                        message = "Unknown host";
                    } else {
                        message = exece.getClass().getName();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Exception while fetching {}", fit.url, exece);
                        } else {
                            LOG.info("Exception while fetching {} -> {}", fit.url, message);
                        }
                    }

                    if (metadata.size() == 0) {
                        metadata = new Metadata();
                    }
                    // add the reason of the failure in the metadata
                    metadata.setValue("fetch.exception", message);

                    // send to status stream
                    collector.emit(
                            Constants.StatusStreamName,
                            fit.t,
                            new Values(fit.url, metadata, Status.FETCH_ERROR));

                    eventCounter.scope("exception").incrBy(1);
                } finally {
                    fetchQueues.finishFetchItem(fit, asap);
                    activeThreads.decrementAndGet(); // count threads
                    // ack it whatever happens
                    collector.ack(fit.t);
                    beingFetched[threadNum] = "";
                }
            }
        }