in core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java [168:430]
public void execute(Tuple tuple) {
final byte[] content = tuple.getBinaryByField("content");
final String url = tuple.getStringByField("url");
final Metadata metadata = (Metadata) tuple.getValueByField("metadata");
LOG.info("Parsing : starting {}", url);
// check that its content type is HTML
// look at value found in HTTP headers
boolean CT_OK = false;
String mimeType = metadata.getFirstValue(HttpHeaders.CONTENT_TYPE, this.protocolMDprefix);
if (detectMimeType) {
try {
mimeType = guessMimeType(url, mimeType, content);
} catch (Exception e) {
String errorMessage = "Exception while guessing mimetype on " + url + ": " + e;
handleException(url, e, metadata, tuple, "mimetype guessing", errorMessage);
return;
}
// store identified type in md
metadata.setValue("parse.Content-Type", mimeType);
}
if (StringUtils.isNotBlank(mimeType)) {
if (mimeType.toLowerCase(Locale.ROOT).contains("html")) {
CT_OK = true;
}
}
// go ahead even if no mimetype is available
else {
CT_OK = true;
}
if (!CT_OK) {
if (this.treat_non_html_as_error) {
String errorMessage = "Exception content-type " + mimeType + " for " + url;
RuntimeException e = new RuntimeException(errorMessage);
handleException(url, e, metadata, tuple, "content-type checking", errorMessage);
} else {
LOG.info("Unsupported mimetype {} - passing on : {}", mimeType, url);
collector.emit(tuple, new Values(url, content, metadata, ""));
collector.ack(tuple);
}
return;
}
long start = System.currentTimeMillis();
String charset;
if (fastCharsetDetection) {
charset =
CharsetIdentification.getCharsetFast(
metadata, content, maxLengthCharsetDetection);
} else {
charset =
CharsetIdentification.getCharset(metadata, content, maxLengthCharsetDetection);
}
LOG.debug(
"Charset identified as {} in {} msec",
charset,
(System.currentTimeMillis() - start));
RobotsTags robotsTags = new RobotsTags();
// get the robots tags from the fetch metadata
if (!robotsHeaderSkip) {
robotsTags = new RobotsTags(metadata, this.protocolMDprefix);
}
Map<String, List<String>> slinks;
String text;
final org.jsoup.nodes.Document jsoupDoc;
try {
String html = Charset.forName(charset).decode(ByteBuffer.wrap(content)).toString();
jsoupDoc = Parser.htmlParser().parseInput(html, url);
if (!robotsMetaSkip) {
// extracts the robots directives from the meta tags
Element robotelement = jsoupDoc.selectFirst("meta[name~=(?i)robots][content]");
if (robotelement != null) {
robotsTags.extractMetaTags(robotelement.attr("content"));
}
}
// store a normalised representation in metadata
// so that the indexer is aware of it
robotsTags.normaliseToMetadata(metadata);
// do not extract the links if no follow has been set
// and we are in strict mode
if (robotsTags.isNoFollow() && robots_noFollow_strict) {
slinks = new HashMap<>(0);
} else {
final Elements links = jsoupDoc.select("a[href]");
slinks = new HashMap<>(links.size());
final URL baseURL = new URL(url);
for (Element link : links) {
// nofollow
String[] relkeywords = link.attr("rel").split(" ");
boolean noFollow =
Stream.of(relkeywords).anyMatch(x -> x.equalsIgnoreCase("nofollow"));
// remove altogether
if (noFollow && robots_noFollow_strict) {
continue;
}
// link not specifically marked as no follow
// but whole page is
if (!noFollow && robotsTags.isNoFollow()) {
noFollow = true;
}
String targetURL = null;
try {
// abs:href tells jsoup to return fully qualified domains
// for relative urls
// but it is very slow as it builds intermediate URL objects
// and normalises the URL of the document every time
targetURL = URLUtil.resolveURL(baseURL, link.attr("href")).toExternalForm();
} catch (MalformedURLException e) {
LOG.debug(
"Cannot resolve URL with baseURL : {} and href : {}",
baseURL,
link.attr("href"),
e);
}
if (StringUtils.isBlank(targetURL)) {
continue;
}
final List<String> anchors =
slinks.computeIfAbsent(targetURL, a -> new LinkedList<>());
// any existing anchors for the same target?
final String anchor = link.text();
// track the anchors only if no follow is false
if (!noFollow && StringUtils.isNotBlank(anchor)) {
anchors.add(anchor);
}
}
}
Element body = jsoupDoc.body();
text = textExtractor.text(body);
} catch (Throwable e) {
String errorMessage = "Exception while parsing " + url + ": " + e;
handleException(url, e, metadata, tuple, "content parsing", errorMessage);
return;
}
// store identified charset in md
metadata.setValue("parse.Content-Encoding", charset);
// track that is has been successfully handled
metadata.setValue("parsed.by", this.getClass().getName());
long duration = System.currentTimeMillis() - start;
LOG.info("Parsed {} in {} msec", url, duration);
// redirection?
if (!ignoreMetaRedirections) {
try {
final String redirection = RefreshTag.extractRefreshURL(jsoupDoc);
if (StringUtils.isNotBlank(redirection)) {
// stores the URL it redirects to
// used for debugging mainly - do not resolve the target
// URL
LOG.info("Found redir in {} to {}", url, redirection);
metadata.setValue("_redirTo", redirection);
// https://github.com/apache/incubator-stormcrawler/issues/954
if (allowRedirs() && StringUtils.isNotBlank(redirection)) {
emitOutlink(tuple, new URL(url), redirection, metadata);
}
// Mark URL as redirected
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
tuple,
new Values(url, metadata, Status.REDIRECTION));
collector.ack(tuple);
eventCounter.scope("tuple_success").incr();
return;
}
} catch (MalformedURLException e) {
LOG.error("MalformedURLException on {}", url);
}
}
List<Outlink> outlinks = toOutlinks(url, metadata, slinks);
ParseResult parse = new ParseResult(outlinks);
// parse data of the parent URL
ParseData parseData = parse.get(url);
parseData.setMetadata(metadata);
parseData.setText(text);
parseData.setContent(content);
// apply the JSoup filters if any
try {
jsoupFilters.filter(url, content, jsoupDoc, parse);
} catch (RuntimeException e) {
String errorMessage = "Exception while running jsoup filters on " + url + ": " + e;
handleException(url, e, metadata, tuple, "jsoup filtering", errorMessage);
return;
}
// apply the parse filters if any
try {
DocumentFragment fragment = null;
// lazy building of fragment
if (parseFilters.needsDOM()) {
fragment = DocumentFragmentBuilder.fromJsoup(jsoupDoc);
}
parseFilters.filter(url, content, fragment, parse);
} catch (RuntimeException e) {
String errorMessage = "Exception while running parse filters on " + url + ": " + e;
handleException(url, e, metadata, tuple, "content filtering", errorMessage);
return;
}
if (emitOutlinks) {
for (Outlink outlink : parse.getOutlinks()) {
collector.emit(
StatusStreamName,
tuple,
new Values(
outlink.getTargetURL(), outlink.getMetadata(), Status.DISCOVERED));
}
}
// emit each document/subdocument in the ParseResult object
// there should be at least one ParseData item for the "parent" URL
for (Map.Entry<String, ParseData> doc : parse) {
ParseData parseDoc = doc.getValue();
collector.emit(
tuple,
new Values(
doc.getKey(),
parseDoc.getContent(),
parseDoc.getMetadata(),
parseDoc.getText()));
}
LOG.info("Total for {} - {} msec", url, System.currentTimeMillis() - start);
collector.ack(tuple);
eventCounter.scope("tuple_success").incr();
}