public void execute()

in core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java [55:129]


    public void execute(Tuple tuple) {
        String url = tuple.getStringByField("url");
        Metadata metadata = null;

        if (tuple.contains("metadata")) metadata = (Metadata) tuple.getValueByField("metadata");

        // maybe there is a field metadata but it can be null
        // or there was no field at all
        if (metadata == null) metadata = Metadata.empty;

        String partitionKey = null;
        String host = "";

        // IP in metadata?
        if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_IP)) {
            String ip_provided = metadata.getFirstValue("ip");
            if (StringUtils.isNotBlank(ip_provided)) {
                partitionKey = ip_provided;
                eventCounter.scope("provided").incrBy(1);
            }
        }

        if (partitionKey == null) {
            URL u;
            try {
                u = new URL(url);
                host = u.getHost();
            } catch (MalformedURLException e1) {
                eventCounter.scope("Invalid URL").incrBy(1);
                LOG.warn("Invalid URL: {}", url);
                // ack it so that it doesn't get replayed
                _collector.ack(tuple);
                return;
            }
        }

        // partition by hostname
        if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_HOST)) partitionKey = host;

        // partition by domain : needs fixing
        else if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_DOMAIN)) {
            partitionKey = PaidLevelDomain.getPLD(host);
        }

        // partition by IP
        if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_IP) && partitionKey == null) {
            // try to get it from cache first
            partitionKey = cache.get(host);
            if (partitionKey != null) {
                eventCounter.scope("from cache").incrBy(1);
            } else {
                try {
                    long start = System.currentTimeMillis();
                    final InetAddress addr = InetAddress.getByName(host);
                    partitionKey = addr.getHostAddress();
                    long end = System.currentTimeMillis();
                    LOG.debug("Resolved IP {} in {} msec for : {}", partitionKey, end - start, url);

                    // add to cache
                    cache.put(host, partitionKey);

                } catch (final Exception e) {
                    eventCounter.scope("Unable to resolve IP").incrBy(1);
                    LOG.warn("Unable to resolve IP for: {}", host);
                    _collector.ack(tuple);
                    return;
                }
            }
        }

        LOG.debug("Partition Key for: {} > {}", url, partitionKey);

        _collector.emit(tuple, new Values(url, partitionKey, metadata));
        _collector.ack(tuple);
    }