in core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java [55:129]
public void execute(Tuple tuple) {
String url = tuple.getStringByField("url");
Metadata metadata = null;
if (tuple.contains("metadata")) metadata = (Metadata) tuple.getValueByField("metadata");
// maybe there is a field metadata but it can be null
// or there was no field at all
if (metadata == null) metadata = Metadata.empty;
String partitionKey = null;
String host = "";
// IP in metadata?
if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_IP)) {
String ip_provided = metadata.getFirstValue("ip");
if (StringUtils.isNotBlank(ip_provided)) {
partitionKey = ip_provided;
eventCounter.scope("provided").incrBy(1);
}
}
if (partitionKey == null) {
URL u;
try {
u = new URL(url);
host = u.getHost();
} catch (MalformedURLException e1) {
eventCounter.scope("Invalid URL").incrBy(1);
LOG.warn("Invalid URL: {}", url);
// ack it so that it doesn't get replayed
_collector.ack(tuple);
return;
}
}
// partition by hostname
if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_HOST)) partitionKey = host;
// partition by domain : needs fixing
else if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_DOMAIN)) {
partitionKey = PaidLevelDomain.getPLD(host);
}
// partition by IP
if (mode.equalsIgnoreCase(Constants.PARTITION_MODE_IP) && partitionKey == null) {
// try to get it from cache first
partitionKey = cache.get(host);
if (partitionKey != null) {
eventCounter.scope("from cache").incrBy(1);
} else {
try {
long start = System.currentTimeMillis();
final InetAddress addr = InetAddress.getByName(host);
partitionKey = addr.getHostAddress();
long end = System.currentTimeMillis();
LOG.debug("Resolved IP {} in {} msec for : {}", partitionKey, end - start, url);
// add to cache
cache.put(host, partitionKey);
} catch (final Exception e) {
eventCounter.scope("Unable to resolve IP").incrBy(1);
LOG.warn("Unable to resolve IP for: {}", host);
_collector.ack(tuple);
return;
}
}
}
LOG.debug("Partition Key for: {} > {}", url, partitionKey);
_collector.emit(tuple, new Values(url, partitionKey, metadata));
_collector.ack(tuple);
}