public void execute()

in core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java [923:989]


    public void execute(Tuple input) {

        if (TupleUtils.isTick(input)) {
            // detect whether there is a file indicating that we should
            // dump the content of the queues to the log
            if (debugfiletrigger != null && debugfiletrigger.exists()) {
                LOG.info("Found trigger file {}", debugfiletrigger);
                logQueuesContent();
                debugfiletrigger.delete();
            }
            return;
        }

        if (this.maxNumberURLsInQueues != -1) {
            while (this.activeThreads.get() + this.fetchQueues.inQueues.get()
                    >= maxNumberURLsInQueues) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    LOG.error("Interrupted exception caught in execute method");
                    Thread.currentThread().interrupt();
                }
                LOG.debug(
                        "[Fetcher #{}] Threads : {}\tqueues : {}\tin_queues : {}",
                        taskID,
                        this.activeThreads.get(),
                        this.fetchQueues.queues.size(),
                        this.fetchQueues.inQueues.get());
            }
        }

        final String urlString = input.getStringByField("url");
        if (StringUtils.isBlank(urlString)) {
            LOG.info("[Fetcher #{}] Missing value for field url in tuple {}", taskID, input);
            // ignore silently
            collector.ack(input);
            return;
        }

        LOG.debug("Received in Fetcher {}", urlString);

        URL url;

        try {
            url = new URL(urlString);
        } catch (MalformedURLException e) {
            LOG.error("{} is a malformed URL", urlString);

            Metadata metadata = (Metadata) input.getValueByField("metadata");
            if (metadata == null) {
                metadata = new Metadata();
            }
            // Report to status stream and ack
            metadata.setValue(Constants.STATUS_ERROR_CAUSE, "malformed URL");
            collector.emit(
                    org.apache.stormcrawler.Constants.StatusStreamName,
                    input,
                    new Values(urlString, metadata, Status.ERROR));
            collector.ack(input);
            return;
        }

        boolean added = fetchQueues.addFetchItem(url, urlString, input);
        if (!added) {
            collector.fail(input);
        }
    }