public void execute()

in core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java [242:556]


    public void execute(Tuple input) {

        String urlString = input.getStringByField("url");
        if (StringUtils.isBlank(urlString)) {
            LOG.info("[Fetcher #{}] Missing value for field url in tuple {}", taskID, input);
            // ignore silently
            collector.ack(input);
            return;
        }

        Metadata metadata = null;

        if (input.contains("metadata")) metadata = (Metadata) input.getValueByField("metadata");
        if (metadata == null) {
            metadata = new Metadata();
        }

        // https://github.com/apache/incubator-stormcrawler/issues/813
        metadata.remove("fetch.exception");

        URL url;

        try {
            url = new URL(urlString);
        } catch (MalformedURLException e) {
            LOG.error("{} is a malformed URL", urlString);
            // Report to status stream and ack
            metadata.setValue(Constants.STATUS_ERROR_CAUSE, "malformed URL");
            collector.emit(
                    org.apache.stormcrawler.Constants.StatusStreamName,
                    input,
                    new Values(urlString, metadata, Status.ERROR));
            collector.ack(input);
            return;
        }

        String key = getPolitenessKey(url);
        long delay = 0;

        try {
            activeThreads.incrementAndGet();

            Protocol protocol = protocolFactory.getProtocol(url);

            BaseRobotRules rules = protocol.getRobotRules(urlString);
            boolean fromCache = false;
            if (rules instanceof RobotRules
                    && ((RobotRules) rules).getContentLengthFetched().length == 0) {
                fromCache = true;
                eventCounter.scope("robots.fromCache").incrBy(1);
            } else {
                eventCounter.scope("robots.fetched").incrBy(1);
            }

            // autodiscovery of sitemaps
            // the sitemaps will be sent down the topology
            // if the robot file did not come from the cache
            // to avoid sending them unnecessarily

            // check in the metadata if discovery setting has been
            // overridden
            String localSitemapDiscoveryVal = metadata.getFirstValue(SITEMAP_DISCOVERY_PARAM_KEY);

            boolean smautodisco;
            if ("true".equalsIgnoreCase(localSitemapDiscoveryVal)) {
                smautodisco = true;
            } else if ("false".equalsIgnoreCase(localSitemapDiscoveryVal)) {
                smautodisco = false;
            } else {
                smautodisco = sitemapsAutoDiscovery;
            }

            if (!fromCache && smautodisco) {
                for (String sitemapURL : rules.getSitemaps()) {
                    if (rules.isAllowed(sitemapURL)) {
                        emitOutlink(
                                input,
                                url,
                                sitemapURL,
                                metadata,
                                SiteMapParserBolt.isSitemapKey,
                                "true");
                    }
                }
            }

            // has found sitemaps
            // https://github.com/apache/incubator-stormcrawler/issues/710
            // note: we don't care if the sitemap URLs where actually
            // kept
            boolean foundSitemap = (rules.getSitemaps().size() > 0);
            metadata.setValue(SiteMapParserBolt.foundSitemapKey, Boolean.toString(foundSitemap));

            activeThreads.decrementAndGet();

            if (!rules.isAllowed(urlString)) {
                LOG.info("Denied by robots.txt: {}", urlString);

                metadata.setValue(Constants.STATUS_ERROR_CAUSE, "robots.txt");

                // Report to status stream and ack
                collector.emit(
                        org.apache.stormcrawler.Constants.StatusStreamName,
                        input,
                        new Values(urlString, metadata, Status.ERROR));
                collector.ack(input);
                return;
            }

            // check when we are allowed to process it
            long timeWaiting = 0;

            Long timeAllowed = throttler.getIfPresent(key);

            if (timeAllowed != null) {
                long now = System.currentTimeMillis();
                long timeToWait = timeAllowed - now;
                if (timeToWait > 0) {
                    // too long -> send it to the back of the internal queue
                    if (maxThrottleSleepMSec != -1 && timeToWait > maxThrottleSleepMSec) {
                        collector.emitDirect(
                                this.taskID,
                                THROTTLE_STREAM,
                                input,
                                new Values(urlString, metadata));
                        collector.ack(input);
                        LOG.debug("[Fetcher #{}] sent back to the queue {}", taskID, urlString);
                        eventCounter.scope("sentBackToQueue").incrBy(1);
                        return;
                    }
                    // not too much of a wait - sleep here
                    timeWaiting = timeToWait;
                    try {
                        Thread.sleep(timeToWait);
                    } catch (InterruptedException e) {
                        LOG.error("[Fetcher #{}] caught InterruptedException caught while waiting");
                        Thread.currentThread().interrupt();
                    }
                }
            }

            delay = this.crawlDelay;

            // get the delay from robots
            // value is negative when not set
            long robotsDelay = rules.getCrawlDelay();
            if (robotsDelay > 0) {
                if (robotsDelay > maxCrawlDelay) {
                    if (maxCrawlDelayForce) {
                        // cap the value to a maximum
                        // as some sites specify ridiculous values
                        LOG.debug("Delay from robots capped at {} for {}", robotsDelay, url);
                        delay = maxCrawlDelay;
                    } else {
                        LOG.debug(
                                "Skipped URL from queue with overlong crawl-delay ({}): {}",
                                robotsDelay,
                                url);
                        metadata.setValue(Constants.STATUS_ERROR_CAUSE, "crawl_delay");
                        collector.emit(
                                org.apache.stormcrawler.Constants.StatusStreamName,
                                input,
                                new Values(urlString, metadata, Status.ERROR));
                        collector.ack(input);
                        return;
                    }
                } else if (robotsDelay < crawlDelay && crawlDelayForce) {
                    LOG.debug(
                            "Crawl delay for {} too short ({}), set to fetcher.server.delay",
                            url,
                            robotsDelay);
                    delay = crawlDelay;
                } else {
                    delay = robotsDelay;
                }
            }

            LOG.debug("[Fetcher #{}] : Fetching {}", taskID, urlString);

            activeThreads.incrementAndGet();

            long start = System.currentTimeMillis();
            ProtocolResponse response = protocol.getProtocolOutput(urlString, metadata);
            long timeFetching = System.currentTimeMillis() - start;

            final int byteLength = response.getContent().length;

            // get any metrics from the protocol metadata
            response.getMetadata().keySet("metrics.").stream()
                    .forEach(
                            s ->
                                    averagedMetrics
                                            .scope(s.substring(8))
                                            .update(
                                                    Long.parseLong(
                                                            response.getMetadata()
                                                                    .getFirstValue(s))));

            averagedMetrics.scope("wait_time").update(timeWaiting);
            averagedMetrics.scope("fetch_time").update(timeFetching);
            averagedMetrics.scope("bytes_fetched").update(byteLength);
            eventCounter.scope("fetched").incrBy(1);
            eventCounter.scope("bytes_fetched").incrBy(byteLength);
            perSecMetrics.scope("bytes_fetched_perSec").update(byteLength);
            perSecMetrics.scope("fetched_perSec").update(1);

            LOG.info(
                    "[Fetcher #{}] Fetched {} with status {} in {} after waiting {}",
                    taskID,
                    urlString,
                    response.getStatusCode(),
                    timeFetching,
                    timeWaiting);

            Metadata mergedMD = new Metadata();
            mergedMD.putAll(metadata);

            // add a prefix to avoid confusion, preserve protocol metadata
            // persisted or transferred from previous fetches
            mergedMD.putAll(response.getMetadata(), protocolMDprefix);

            mergedMD.setValue("fetch.statusCode", Integer.toString(response.getStatusCode()));

            mergedMD.setValue("fetch.loadingTime", Long.toString(timeFetching));

            mergedMD.setValue("fetch.byteLength", Integer.toString(byteLength));

            // determine the status based on the status code
            final Status status = Status.fromHTTPCode(response.getStatusCode());

            eventCounter.scope("status_" + response.getStatusCode()).incrBy(1);

            // used when sending to status stream
            final Values values4status = new Values(urlString, mergedMD, status);

            // if the status is OK emit on default stream
            if (status.equals(Status.FETCHED)) {
                if (response.getStatusCode() == 304) {
                    // mark this URL as fetched so that it gets
                    // rescheduled
                    // but do not try to parse or index
                    collector.emit(
                            org.apache.stormcrawler.Constants.StatusStreamName,
                            input,
                            values4status);
                } else {
                    collector.emit(
                            Utils.DEFAULT_STREAM_ID,
                            input,
                            new Values(urlString, response.getContent(), mergedMD));
                }
            } else if (status.equals(Status.REDIRECTION)) {

                // find the URL it redirects to
                String redirection = response.getMetadata().getFirstValue(HttpHeaders.LOCATION);

                // stores the URL it redirects to
                // used for debugging mainly - do not resolve the target
                // URL
                if (StringUtils.isNotBlank(redirection)) {
                    mergedMD.setValue("_redirTo", redirection);
                }

                if (allowRedirs() && StringUtils.isNotBlank(redirection)) {
                    emitOutlink(input, url, redirection, mergedMD);
                }
                // Mark URL as redirected
                collector.emit(
                        org.apache.stormcrawler.Constants.StatusStreamName, input, values4status);
            } else {
                // Error
                collector.emit(
                        org.apache.stormcrawler.Constants.StatusStreamName, input, values4status);
            }

        } catch (Exception exece) {

            String message = exece.getMessage();
            if (message == null) message = "";

            // common exceptions for which we log only a short message
            if (exece.getCause() instanceof java.util.concurrent.TimeoutException
                    || message.contains(" timed out")) {
                LOG.error("Socket timeout fetching {}", urlString);
                message = "Socket timeout fetching";
            } else if (exece.getCause() instanceof java.net.UnknownHostException
                    || exece instanceof java.net.UnknownHostException) {
                LOG.error("Unknown host {}", urlString);
                message = "Unknown host";
            } else {
                LOG.error("Exception while fetching {}", urlString, exece);
                message = exece.getClass().getName();
            }
            eventCounter.scope("exception").incrBy(1);

            // could be an empty, immutable Metadata
            if (metadata.size() == 0) {
                metadata = new Metadata();
            }

            // add the reason of the failure in the metadata
            metadata.setValue("fetch.exception", message);

            collector.emit(
                    org.apache.stormcrawler.Constants.StatusStreamName,
                    input,
                    new Values(urlString, metadata, Status.FETCH_ERROR));
        }
        activeThreads.decrementAndGet();

        // update the throttler
        throttler.put(key, System.currentTimeMillis() + delay);

        collector.ack(input);
    }