in metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java [181:258]
public void execute(Tuple tuple) {
perfLog.mark("execute");
String key = tuple.getStringByField("key");
JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
String subGroup = "";
JSONObject enrichedMessage = new JSONObject();
enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
try {
if (rawMessage == null || rawMessage.isEmpty())
throw new Exception("Could not parse binary stream to JSON");
if (key == null)
throw new Exception("Key is not valid");
String sourceType = null;
if(rawMessage.containsKey(Constants.SENSOR_TYPE)) {
sourceType = rawMessage.get(Constants.SENSOR_TYPE).toString();
}
else {
throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString());
}
String prefix = null;
for (Object o : rawMessage.keySet()) {
String field = (String) o;
Object value = rawMessage.get(field);
if (field.equals(Constants.SENSOR_TYPE)) {
enrichedMessage.put(Constants.SENSOR_TYPE, value);
} else {
JSONObject enrichedField = new JSONObject();
if (value != null) {
SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
if(config == null) {
LOG.debug("Unable to find SensorEnrichmentConfig for sourceType: {}", sourceType);
MetronError metronError = new MetronError()
.withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
.withMessage("Unable to find SensorEnrichmentConfig for sourceType: " + sourceType)
.addRawMessage(rawMessage);
StormErrorUtils.handleError(collector, metronError);
continue;
}
config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext);
CacheKey cacheKey= new CacheKey(field, value, config);
try {
adapter.logAccess(cacheKey);
prefix = adapter.getOutputPrefix(cacheKey);
subGroup = adapter.getStreamSubGroup(enrichmentType, field);
perfLog.mark("enrich");
enrichedField = cache.get(cacheKey);
perfLog.log("enrich", "key={}, time to run enrichment type={}", key, enrichmentType);
if (enrichedField == null)
throw new Exception("[Metron] Could not enrich string: "
+ value);
}
catch(Exception e) {
LOG.error(e.getMessage(), e);
MetronError metronError = new MetronError()
.withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
.withThrowable(e)
.withErrorFields(new HashSet() {{ add(field); }})
.addRawMessage(rawMessage);
StormErrorUtils.handleError(collector, metronError);
continue;
}
}
enrichedMessage = EnrichmentUtils.adjustKeys(enrichedMessage, enrichedField, field, prefix);
}
}
enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
if (!enrichedMessage.isEmpty()) {
collector.emit(enrichmentType, new Values(key, enrichedMessage, subGroup));
}
} catch (Exception e) {
handleError(key, rawMessage, subGroup, enrichedMessage, e);
}
perfLog.log("execute", "key={}, elapsed time to run execute", key);
}