in core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java [72:164]
public void execute(Tuple tuple) {
Metadata metadata = (Metadata) tuple.getValueByField("metadata");
byte[] content = tuple.getBinaryByField("content");
String url = tuple.getStringByField("url");
LOG.debug("Processing {}", url);
boolean isfeed = Boolean.parseBoolean(metadata.getFirstValue(isFeedKey));
// doesn't have the metadata expected
if (!isfeed) {
if (sniffWhenNoMDKey) {
// uses mime-type
// won't work when servers return text/xml
// TODO use Tika instead?
String ct = metadata.getFirstValue(HttpHeaders.CONTENT_TYPE, protocolMDprefix);
if (ct != null && ct.contains("rss+xml")) {
isfeed = true;
} else {
// try based on the first bytes?
byte[] clue = "<rss ".getBytes(StandardCharsets.UTF_8);
byte[] beginning = content;
final int maxOffsetGuess = 100;
if (content.length > maxOffsetGuess) {
beginning = Arrays.copyOfRange(content, 0, maxOffsetGuess);
}
if (Bytes.indexOf(beginning, clue) != -1) {
LOG.info("{} detected as rss feed based on content", url);
isfeed = true;
}
}
}
}
// still not a feed file
if (!isfeed) {
LOG.debug("Not a feed {}", url);
// just pass it on
this.collector.emit(tuple, tuple.getValues());
this.collector.ack(tuple);
return;
} else {
// can be used later on for custom scheduling
metadata.setValue(isFeedKey, "true");
}
List<Outlink> outlinks;
try {
outlinks = parseFeed(url, content, metadata);
} catch (Exception e) {
// exception while parsing the feed
String errorMessage = "Exception while parsing " + url + ": " + e;
LOG.error(errorMessage, e);
// send to status stream in case another component wants to update
// its status
metadata.setValue(Constants.STATUS_ERROR_SOURCE, "feed parsing");
metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage);
collector.emit(
Constants.StatusStreamName, tuple, new Values(url, metadata, Status.ERROR));
this.collector.ack(tuple);
return;
}
// apply the parse filters if any to the current document
ParseResult parse = new ParseResult(outlinks);
parse.set(url, metadata);
// apply the parse filters if any
try {
parseFilters.filter(url, content, null, parse);
} catch (RuntimeException e) {
String errorMessage = "Exception while running parse filters on " + url + ": " + e;
LOG.error(errorMessage, e);
metadata.setValue(Constants.STATUS_ERROR_SOURCE, "content filtering");
metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage);
collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.ERROR));
collector.ack(tuple);
return;
}
// send to status stream
for (Outlink ol : parse.getOutlinks()) {
Values v = new Values(ol.getTargetURL(), ol.getMetadata(), Status.DISCOVERED);
collector.emit(Constants.StatusStreamName, tuple, v);
}
LOG.info("Feed parser done {}", url);
// marking the main URL as successfully fetched
// regardless of whether we got a parse exception or not
collector.emit(
Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
this.collector.ack(tuple);
}