in metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java [96:169]
public static JSONObject process( Map<String, Object> message
, ConfigHandler handler
, String field
, Long slowLogThreshold
, StellarProcessor processor
, VariableResolver resolver
, Context stellarContext
)
{
JSONObject ret = new JSONObject();
Iterable<Map.Entry<String, Object>> stellarStatements = getStellarStatements(handler, field);
_LOG.debug("message := {}", message);
if(stellarStatements != null) {
List<String> mapEntries = new ArrayList<>();
for (Map.Entry<String, Object> kv : stellarStatements) {
if(kv.getKey() != null && kv.getValue() != null) {
if (kv.getValue() instanceof String) {
long startTime = System.currentTimeMillis();
String stellarStatement = (String) kv.getValue();
Object o = null;
try {
o = processor.parse(stellarStatement, resolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
}
catch(Exception e) {
_LOG.error(e.getMessage(), e);
throw e;
}
if (slowLogThreshold != null && _PERF_LOG.isDebugEnabled()) {
long duration = System.currentTimeMillis() - startTime;
if (duration > slowLogThreshold) {
_PERF_LOG.debug("SLOW LOG: {} took {} ms",stellarStatement,duration);
}
}
_LOG.debug("{} := {} yields {}", kv.getKey(), stellarStatement , o);
if (o != null && o instanceof Map) {
mapEntries.add(kv.getKey());
}
if(o == null) {
message.remove(kv.getKey());
ret.remove(kv.getKey());
}
else {
message.put(kv.getKey(), o);
ret.put(kv.getKey(), o);
}
}
}
}
/*
We need to handle the map entries separately now.
We want to explode them out, so if "var" is
{
"foo" : "bar"
}
then we want "var.foo" == "bar"
and no "var"
*/
for(String mapEntry : mapEntries) {
String key = mapEntry;
Map<Object, Object> value = (Map<Object, Object>) ret.get(key);
if(value != null) {
_LOG.debug("Exploding map: {} == {}", key, value);
for (Map.Entry<Object, Object> valueKv : value.entrySet()) {
String newKey = ((key.length() > 0) ? key + "." : "") + valueKv.getKey();
ret.put(newKey, valueKv.getValue());
}
//removing the map from downstream
ret.remove(key);
}
}
}
return ret;
}