public void execute()

in core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java [72:164]


    public void execute(Tuple tuple) {
        Metadata metadata = (Metadata) tuple.getValueByField("metadata");
        byte[] content = tuple.getBinaryByField("content");
        String url = tuple.getStringByField("url");

        LOG.debug("Processing {}", url);

        boolean isfeed = Boolean.parseBoolean(metadata.getFirstValue(isFeedKey));
        // doesn't have the metadata expected
        if (!isfeed) {
            if (sniffWhenNoMDKey) {
                // uses mime-type
                // won't work when servers return text/xml
                // TODO use Tika instead?
                String ct = metadata.getFirstValue(HttpHeaders.CONTENT_TYPE, protocolMDprefix);
                if (ct != null && ct.contains("rss+xml")) {
                    isfeed = true;
                } else {
                    // try based on the first bytes?
                    byte[] clue = "<rss ".getBytes(StandardCharsets.UTF_8);
                    byte[] beginning = content;
                    final int maxOffsetGuess = 100;
                    if (content.length > maxOffsetGuess) {
                        beginning = Arrays.copyOfRange(content, 0, maxOffsetGuess);
                    }
                    if (Bytes.indexOf(beginning, clue) != -1) {
                        LOG.info("{} detected as rss feed based on content", url);
                        isfeed = true;
                    }
                }
            }
        }

        // still not a feed file
        if (!isfeed) {
            LOG.debug("Not a feed {}", url);
            // just pass it on
            this.collector.emit(tuple, tuple.getValues());
            this.collector.ack(tuple);
            return;
        } else {
            // can be used later on for custom scheduling
            metadata.setValue(isFeedKey, "true");
        }

        List<Outlink> outlinks;
        try {
            outlinks = parseFeed(url, content, metadata);
        } catch (Exception e) {
            // exception while parsing the feed
            String errorMessage = "Exception while parsing " + url + ": " + e;
            LOG.error(errorMessage, e);
            // send to status stream in case another component wants to update
            // its status
            metadata.setValue(Constants.STATUS_ERROR_SOURCE, "feed parsing");
            metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage);
            collector.emit(
                    Constants.StatusStreamName, tuple, new Values(url, metadata, Status.ERROR));
            this.collector.ack(tuple);
            return;
        }

        // apply the parse filters if any to the current document
        ParseResult parse = new ParseResult(outlinks);
        parse.set(url, metadata);

        // apply the parse filters if any
        try {
            parseFilters.filter(url, content, null, parse);
        } catch (RuntimeException e) {
            String errorMessage = "Exception while running parse filters on " + url + ": " + e;
            LOG.error(errorMessage, e);
            metadata.setValue(Constants.STATUS_ERROR_SOURCE, "content filtering");
            metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage);
            collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.ERROR));
            collector.ack(tuple);
            return;
        }

        // send to status stream
        for (Outlink ol : parse.getOutlinks()) {
            Values v = new Values(ol.getTargetURL(), ol.getMetadata(), Status.DISCOVERED);
            collector.emit(Constants.StatusStreamName, tuple, v);
        }

        LOG.info("Feed parser done {}", url);

        // marking the main URL as successfully fetched
        // regardless of whether we got a parse exception or not
        collector.emit(
                Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
        this.collector.ack(tuple);
    }