public void execute()

in core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java [168:430]


    public void execute(Tuple tuple) {

        final byte[] content = tuple.getBinaryByField("content");
        final String url = tuple.getStringByField("url");
        final Metadata metadata = (Metadata) tuple.getValueByField("metadata");

        LOG.info("Parsing : starting {}", url);

        // check that its content type is HTML
        // look at value found in HTTP headers
        boolean CT_OK = false;

        String mimeType = metadata.getFirstValue(HttpHeaders.CONTENT_TYPE, this.protocolMDprefix);

        if (detectMimeType) {
            try {
                mimeType = guessMimeType(url, mimeType, content);
            } catch (Exception e) {
                String errorMessage = "Exception while guessing mimetype on " + url + ": " + e;
                handleException(url, e, metadata, tuple, "mimetype guessing", errorMessage);
                return;
            }
            // store identified type in md
            metadata.setValue("parse.Content-Type", mimeType);
        }

        if (StringUtils.isNotBlank(mimeType)) {
            if (mimeType.toLowerCase(Locale.ROOT).contains("html")) {
                CT_OK = true;
            }
        }
        // go ahead even if no mimetype is available
        else {
            CT_OK = true;
        }

        if (!CT_OK) {
            if (this.treat_non_html_as_error) {
                String errorMessage = "Exception content-type " + mimeType + " for " + url;
                RuntimeException e = new RuntimeException(errorMessage);
                handleException(url, e, metadata, tuple, "content-type checking", errorMessage);
            } else {
                LOG.info("Unsupported mimetype {} - passing on : {}", mimeType, url);
                collector.emit(tuple, new Values(url, content, metadata, ""));
                collector.ack(tuple);
            }
            return;
        }

        long start = System.currentTimeMillis();

        String charset;

        if (fastCharsetDetection) {
            charset =
                    CharsetIdentification.getCharsetFast(
                            metadata, content, maxLengthCharsetDetection);
        } else {
            charset =
                    CharsetIdentification.getCharset(metadata, content, maxLengthCharsetDetection);
        }

        LOG.debug(
                "Charset identified as {} in {} msec",
                charset,
                (System.currentTimeMillis() - start));

        RobotsTags robotsTags = new RobotsTags();

        // get the robots tags from the fetch metadata
        if (!robotsHeaderSkip) {
            robotsTags = new RobotsTags(metadata, this.protocolMDprefix);
        }

        Map<String, List<String>> slinks;
        String text;
        final org.jsoup.nodes.Document jsoupDoc;

        try {
            String html = Charset.forName(charset).decode(ByteBuffer.wrap(content)).toString();

            jsoupDoc = Parser.htmlParser().parseInput(html, url);

            if (!robotsMetaSkip) {
                // extracts the robots directives from the meta tags
                Element robotelement = jsoupDoc.selectFirst("meta[name~=(?i)robots][content]");
                if (robotelement != null) {
                    robotsTags.extractMetaTags(robotelement.attr("content"));
                }
            }

            // store a normalised representation in metadata
            // so that the indexer is aware of it
            robotsTags.normaliseToMetadata(metadata);

            // do not extract the links if no follow has been set
            // and we are in strict mode
            if (robotsTags.isNoFollow() && robots_noFollow_strict) {
                slinks = new HashMap<>(0);
            } else {
                final Elements links = jsoupDoc.select("a[href]");
                slinks = new HashMap<>(links.size());
                final URL baseURL = new URL(url);
                for (Element link : links) {
                    // nofollow
                    String[] relkeywords = link.attr("rel").split(" ");
                    boolean noFollow =
                            Stream.of(relkeywords).anyMatch(x -> x.equalsIgnoreCase("nofollow"));

                    // remove altogether
                    if (noFollow && robots_noFollow_strict) {
                        continue;
                    }

                    // link not specifically marked as no follow
                    // but whole page is
                    if (!noFollow && robotsTags.isNoFollow()) {
                        noFollow = true;
                    }

                    String targetURL = null;

                    try {
                        // abs:href tells jsoup to return fully qualified domains
                        // for relative urls
                        // but it is very slow as it builds intermediate URL objects
                        // and normalises the URL of the document every time
                        targetURL = URLUtil.resolveURL(baseURL, link.attr("href")).toExternalForm();
                    } catch (MalformedURLException e) {
                        LOG.debug(
                                "Cannot resolve URL with baseURL : {} and href : {}",
                                baseURL,
                                link.attr("href"),
                                e);
                    }

                    if (StringUtils.isBlank(targetURL)) {
                        continue;
                    }

                    final List<String> anchors =
                            slinks.computeIfAbsent(targetURL, a -> new LinkedList<>());

                    // any existing anchors for the same target?
                    final String anchor = link.text();
                    // track the anchors only if no follow is false
                    if (!noFollow && StringUtils.isNotBlank(anchor)) {
                        anchors.add(anchor);
                    }
                }
            }

            Element body = jsoupDoc.body();
            text = textExtractor.text(body);

        } catch (Throwable e) {
            String errorMessage = "Exception while parsing " + url + ": " + e;
            handleException(url, e, metadata, tuple, "content parsing", errorMessage);
            return;
        }

        // store identified charset in md
        metadata.setValue("parse.Content-Encoding", charset);

        // track that is has been successfully handled
        metadata.setValue("parsed.by", this.getClass().getName());

        long duration = System.currentTimeMillis() - start;

        LOG.info("Parsed {} in {} msec", url, duration);

        // redirection?
        if (!ignoreMetaRedirections) {
            try {
                final String redirection = RefreshTag.extractRefreshURL(jsoupDoc);
                if (StringUtils.isNotBlank(redirection)) {
                    // stores the URL it redirects to
                    // used for debugging mainly - do not resolve the target
                    // URL
                    LOG.info("Found redir in {} to {}", url, redirection);
                    metadata.setValue("_redirTo", redirection);

                    // https://github.com/apache/incubator-stormcrawler/issues/954
                    if (allowRedirs() && StringUtils.isNotBlank(redirection)) {
                        emitOutlink(tuple, new URL(url), redirection, metadata);
                    }

                    // Mark URL as redirected
                    collector.emit(
                            org.apache.stormcrawler.Constants.StatusStreamName,
                            tuple,
                            new Values(url, metadata, Status.REDIRECTION));
                    collector.ack(tuple);
                    eventCounter.scope("tuple_success").incr();
                    return;
                }
            } catch (MalformedURLException e) {
                LOG.error("MalformedURLException on {}", url);
            }
        }

        List<Outlink> outlinks = toOutlinks(url, metadata, slinks);

        ParseResult parse = new ParseResult(outlinks);

        // parse data of the parent URL
        ParseData parseData = parse.get(url);
        parseData.setMetadata(metadata);
        parseData.setText(text);
        parseData.setContent(content);

        // apply the JSoup filters if any
        try {
            jsoupFilters.filter(url, content, jsoupDoc, parse);
        } catch (RuntimeException e) {
            String errorMessage = "Exception while running jsoup filters on " + url + ": " + e;
            handleException(url, e, metadata, tuple, "jsoup filtering", errorMessage);
            return;
        }

        // apply the parse filters if any
        try {
            DocumentFragment fragment = null;
            // lazy building of fragment
            if (parseFilters.needsDOM()) {
                fragment = DocumentFragmentBuilder.fromJsoup(jsoupDoc);
            }
            parseFilters.filter(url, content, fragment, parse);
        } catch (RuntimeException e) {
            String errorMessage = "Exception while running parse filters on " + url + ": " + e;
            handleException(url, e, metadata, tuple, "content filtering", errorMessage);
            return;
        }

        if (emitOutlinks) {
            for (Outlink outlink : parse.getOutlinks()) {
                collector.emit(
                        StatusStreamName,
                        tuple,
                        new Values(
                                outlink.getTargetURL(), outlink.getMetadata(), Status.DISCOVERED));
            }
        }

        // emit each document/subdocument in the ParseResult object
        // there should be at least one ParseData item for the "parent" URL

        for (Map.Entry<String, ParseData> doc : parse) {
            ParseData parseDoc = doc.getValue();
            collector.emit(
                    tuple,
                    new Values(
                            doc.getKey(),
                            parseDoc.getContent(),
                            parseDoc.getMetadata(),
                            parseDoc.getText()));
        }

        LOG.info("Total for {} - {} msec", url, System.currentTimeMillis() - start);

        collector.ack(tuple);
        eventCounter.scope("tuple_success").incr();
    }