in core/src/main/java/org/apache/stormcrawler/persistence/AdaptiveScheduler.java [201:317]
public Optional<Date> schedule(Status status, Metadata metadata) {
LOG.debug("Scheduling status: {}, metadata: {}", status, metadata);
String signature = metadata.getFirstValue(SIGNATURE_KEY);
String oldSignature = metadata.getFirstValue(SIGNATURE_OLD_KEY);
if (status != Status.FETCHED) {
// reset all metadata
metadata.remove(SIGNATURE_MODIFIED_KEY);
metadata.remove(FETCH_INTERVAL_KEY);
metadata.remove(SIGNATURE_KEY);
metadata.remove(SIGNATURE_OLD_KEY);
if (status == Status.ERROR) {
/*
* remove last-modified for permanent errors so that no
* if-modified-since request is sent: the content is needed
* again to be parsed and index
*/
metadata.remove(HttpHeaders.LAST_MODIFIED);
}
// fall-back to DefaultScheduler
return super.schedule(status, metadata);
}
Calendar now = Calendar.getInstance(TimeZone.getTimeZone("GMT"), Locale.ROOT);
String signatureModified = metadata.getFirstValue(SIGNATURE_MODIFIED_KEY);
boolean changed = false;
final String modifiedTimeString = now.toInstant().toString();
if (metadata.getFirstValue("fetch.statusCode").equals("304")) {
// HTTP 304 Not Modified
// - no new signature calculated because no content fetched
// - do not compare persisted signatures
// - leave last-modified time unchanged
} else if (signature == null || oldSignature == null) {
// no decision possible by signature comparison if
// - document not parsed (intentionally or not) or
// - signature not generated or
// - old signature not copied
// fall-back to DefaultScheduler
LOG.debug("No signature for FETCHED page: {}", metadata);
if (setLastModified && signature != null) {
// set last-modified time for first fetch
metadata.setValue(HttpHeaders.LAST_MODIFIED, modifiedTimeString);
}
Optional<Date> nextFetch = super.schedule(status, metadata);
if (nextFetch.isPresent()) {
long fetchIntervalMinutes =
Duration.between(now.toInstant(), nextFetch.get().toInstant()).toMinutes();
metadata.setValue(FETCH_INTERVAL_KEY, Long.toString(fetchIntervalMinutes));
}
return nextFetch;
} else if (signature.equals(oldSignature)) {
// unchanged
} else {
// change detected by signature comparison
changed = true;
signatureModified = modifiedTimeString;
if (setLastModified) {
metadata.setValue(HttpHeaders.LAST_MODIFIED, modifiedTimeString);
}
}
String fetchInterval = metadata.getFirstValue(FETCH_INTERVAL_KEY);
int interval;
if (fetchInterval != null) {
interval = Integer.parseInt(fetchInterval);
} else {
// initialize from DefaultScheduler
Optional<Integer> customInterval = super.checkCustomInterval(metadata, status);
if (customInterval.isPresent()) {
interval = customInterval.get();
} else {
interval = defaultfetchInterval;
}
fetchInterval = Integer.toString(interval);
}
if (changed) {
// shrink fetch interval (slow down decrementing if already close to
// the minimum interval)
interval =
(int)
((1.0f - fetchIntervalDecRate) * interval
+ fetchIntervalDecRate * minFetchInterval);
LOG.debug(
"Signature has changed, fetchInterval decreased from {} to {}",
fetchInterval,
interval);
} else {
// no change or not modified, increase fetch interval
interval = (int) (interval * (1.0f + fetchIntervalIncRate));
if (interval > maxFetchInterval) {
interval = maxFetchInterval;
}
LOG.debug("Unchanged, fetchInterval increased from {} to {}", fetchInterval, interval);
// remove old signature (do not keep same signature twice)
metadata.remove(SIGNATURE_OLD_KEY);
if (signatureModified == null) {
signatureModified = modifiedTimeString;
}
}
metadata.setValue(FETCH_INTERVAL_KEY, Integer.toString(interval));
metadata.setValue(SIGNATURE_MODIFIED_KEY, signatureModified);
now.add(Calendar.MINUTE, interval);
return Optional.of(now.getTime());
}