in core/src/main/java/org/apache/stormcrawler/indexing/AbstractIndexerBolt.java [134:183]
public void prepare(
Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
String mdF = ConfUtils.getString(conf, metadataFilterParamName);
if (StringUtils.isNotBlank(mdF)) {
// split it in key value
int equals = mdF.indexOf('=');
if (equals != -1) {
String key = mdF.substring(0, equals);
String value = mdF.substring(equals + 1);
filterKeyValue = new String[] {key.trim(), value.trim()};
} else {
LOG.error("Can't split into key value : {}", mdF);
}
}
fieldNameForText = ConfUtils.getString(conf, textFieldParamName);
maxLengthText = ConfUtils.getInt(conf, textLengthParamName, -1);
fieldNameForURL = ConfUtils.getString(conf, urlFieldParamName);
canonicalMetadataName = ConfUtils.getString(conf, canonicalMetadataParamName);
final Pattern indexValuePattern = Pattern.compile("\\[(\\d+)\\]");
for (String mapping : ConfUtils.loadListFromConf(metadata2fieldParamName, conf)) {
int equals = mapping.indexOf('=');
String key, value;
if (equals != -1) {
key = mapping.substring(0, equals).trim();
value = mapping.substring(equals + 1).trim();
} else {
mapping = mapping.trim();
key = mapping;
value = null;
}
int index = -1;
Matcher match = indexValuePattern.matcher(key);
if (match.find()) {
index = Integer.parseInt(match.group(1));
key = key.substring(0, match.start());
}
metadata2field.add(new Key(key, index, value));
LOG.info("Mapping key {} to field {}", key, value);
}
ignoreEmptyFields =
ConfUtils.getBoolean(conf, ignoreEmptyFieldValueParamName, ignoreEmptyFields);
}