public void execute()

in core/src/main/java/org/apache/stormcrawler/persistence/AbstractStatusUpdaterBolt.java [139:242]


    public void execute(Tuple tuple) {

        String url = tuple.getStringByField("url");
        Status status = (Status) tuple.getValueByField("status");

        boolean potentiallyNew = status.equals(Status.DISCOVERED);

        // if the URL is a freshly discovered one
        // check whether it is already known in the cache
        // if so we've already seen it and don't need to
        // store it again
        if (potentiallyNew && useCache) {
            if (cache.getIfPresent(url) != null) {
                // no need to add it to the queue
                LOG.debug("URL {} already in cache", url);
                cacheHits++;
                _collector.ack(tuple);
                return;
            } else {
                LOG.debug("URL {} not in cache", url);
                cacheMisses++;
            }
        }

        Metadata metadata = (Metadata) tuple.getValueByField("metadata");

        // store directly with the date specified in the metadata without
        // changing the status or scheduling.
        String dateInMetadata = metadata.getFirstValue(AS_IS_NEXTFETCHDATE_METADATA);
        if (dateInMetadata != null) {
            Date nextFetch = Date.from(Instant.parse(dateInMetadata));
            try {
                store(url, status, mdTransfer.filter(metadata), Optional.of(nextFetch), tuple);
                return;
            } catch (Exception e) {
                LOG.error("Exception caught when storing", e);
                _collector.fail(tuple);
                return;
            }
        }

        // store last processed or discovery date in UTC
        final String nowAsString = Instant.now().toString();
        if (status.equals(Status.DISCOVERED)) {
            metadata.setValue("discoveryDate", nowAsString);
        } else {
            metadata.setValue("lastProcessedDate", nowAsString);
        }

        // too many fetch errors?
        if (status.equals(Status.FETCH_ERROR)) {
            String errorCount = metadata.getFirstValue(Constants.fetchErrorCountParamName);
            int count = 0;
            try {
                count = Integer.parseInt(errorCount);
            } catch (NumberFormatException e) {
            }
            count++;
            if (count >= maxFetchErrors) {
                status = Status.ERROR;
                metadata.setValue(Constants.STATUS_ERROR_CAUSE, "maxFetchErrors");
            } else {
                metadata.setValue(Constants.fetchErrorCountParamName, Integer.toString(count));
            }
        }

        // delete any existing error count metadata
        // e.g. status changed
        if (!status.equals(Status.FETCH_ERROR)) {
            metadata.remove(Constants.fetchErrorCountParamName);
        }
        // https://github.com/apache/incubator-stormcrawler/issues/415
        // remove error related key values in case of success
        if (status.equals(Status.FETCHED) || status.equals(Status.REDIRECTION)) {
            metadata.remove(Constants.STATUS_ERROR_CAUSE);
            metadata.remove(Constants.STATUS_ERROR_MESSAGE);
            metadata.remove(Constants.STATUS_ERROR_SOURCE);
        }
        // gone? notify any deleters. Doesn't need to be anchored
        else if (status == Status.ERROR) {
            _collector.emit(Constants.DELETION_STREAM_NAME, new Values(url, metadata));
        }

        // determine the value of the next fetch based on the status
        Optional<Date> nextFetch = scheduler.schedule(status, metadata);

        // filter metadata just before storing it, so that non-persisted
        // metadata is available to fetch schedulers
        metadata = mdTransfer.filter(metadata);

        // round next fetch date - unless it is never
        if (nextFetch.isPresent()) {
            nextFetch = Optional.of(DateUtils.round(nextFetch.get(), this.roundDateUnit));
        }

        // extensions of this class will handle the storage
        // on a per document basis
        try {
            store(url, status, metadata, nextFetch, tuple);
        } catch (Exception e) {
            LOG.error("Exception caught when storing", e);
            _collector.fail(tuple);
        }
    }