in core/src/main/java/org/apache/stormcrawler/persistence/AbstractStatusUpdaterBolt.java [139:242]
public void execute(Tuple tuple) {
String url = tuple.getStringByField("url");
Status status = (Status) tuple.getValueByField("status");
boolean potentiallyNew = status.equals(Status.DISCOVERED);
// if the URL is a freshly discovered one
// check whether it is already known in the cache
// if so we've already seen it and don't need to
// store it again
if (potentiallyNew && useCache) {
if (cache.getIfPresent(url) != null) {
// no need to add it to the queue
LOG.debug("URL {} already in cache", url);
cacheHits++;
_collector.ack(tuple);
return;
} else {
LOG.debug("URL {} not in cache", url);
cacheMisses++;
}
}
Metadata metadata = (Metadata) tuple.getValueByField("metadata");
// store directly with the date specified in the metadata without
// changing the status or scheduling.
String dateInMetadata = metadata.getFirstValue(AS_IS_NEXTFETCHDATE_METADATA);
if (dateInMetadata != null) {
Date nextFetch = Date.from(Instant.parse(dateInMetadata));
try {
store(url, status, mdTransfer.filter(metadata), Optional.of(nextFetch), tuple);
return;
} catch (Exception e) {
LOG.error("Exception caught when storing", e);
_collector.fail(tuple);
return;
}
}
// store last processed or discovery date in UTC
final String nowAsString = Instant.now().toString();
if (status.equals(Status.DISCOVERED)) {
metadata.setValue("discoveryDate", nowAsString);
} else {
metadata.setValue("lastProcessedDate", nowAsString);
}
// too many fetch errors?
if (status.equals(Status.FETCH_ERROR)) {
String errorCount = metadata.getFirstValue(Constants.fetchErrorCountParamName);
int count = 0;
try {
count = Integer.parseInt(errorCount);
} catch (NumberFormatException e) {
}
count++;
if (count >= maxFetchErrors) {
status = Status.ERROR;
metadata.setValue(Constants.STATUS_ERROR_CAUSE, "maxFetchErrors");
} else {
metadata.setValue(Constants.fetchErrorCountParamName, Integer.toString(count));
}
}
// delete any existing error count metadata
// e.g. status changed
if (!status.equals(Status.FETCH_ERROR)) {
metadata.remove(Constants.fetchErrorCountParamName);
}
// https://github.com/apache/incubator-stormcrawler/issues/415
// remove error related key values in case of success
if (status.equals(Status.FETCHED) || status.equals(Status.REDIRECTION)) {
metadata.remove(Constants.STATUS_ERROR_CAUSE);
metadata.remove(Constants.STATUS_ERROR_MESSAGE);
metadata.remove(Constants.STATUS_ERROR_SOURCE);
}
// gone? notify any deleters. Doesn't need to be anchored
else if (status == Status.ERROR) {
_collector.emit(Constants.DELETION_STREAM_NAME, new Values(url, metadata));
}
// determine the value of the next fetch based on the status
Optional<Date> nextFetch = scheduler.schedule(status, metadata);
// filter metadata just before storing it, so that non-persisted
// metadata is available to fetch schedulers
metadata = mdTransfer.filter(metadata);
// round next fetch date - unless it is never
if (nextFetch.isPresent()) {
nextFetch = Optional.of(DateUtils.round(nextFetch.get(), this.roundDateUnit));
}
// extensions of this class will handle the storage
// on a per document basis
try {
store(url, status, metadata, nextFetch, tuple);
} catch (Exception e) {
LOG.error("Exception caught when storing", e);
_collector.fail(tuple);
}
}