public static JSONObject process()

in metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java [96:169]


  public static JSONObject process( Map<String, Object> message
                                           , ConfigHandler handler
                                           , String field
                                           , Long slowLogThreshold
                                           , StellarProcessor processor
                                           , VariableResolver resolver
                                           , Context stellarContext
                                           )
  {
    JSONObject ret = new JSONObject();
    Iterable<Map.Entry<String, Object>> stellarStatements = getStellarStatements(handler, field);

    _LOG.debug("message := {}", message);
    if(stellarStatements != null) {
      List<String> mapEntries = new ArrayList<>();
      for (Map.Entry<String, Object> kv : stellarStatements) {
        if(kv.getKey() != null && kv.getValue() != null) {
          if (kv.getValue() instanceof String) {
            long startTime = System.currentTimeMillis();
            String stellarStatement = (String) kv.getValue();
            Object o = null;
            try {
              o = processor.parse(stellarStatement, resolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
            }
            catch(Exception e) {
              _LOG.error(e.getMessage(), e);
              throw e;
            }
            if (slowLogThreshold != null && _PERF_LOG.isDebugEnabled()) {
              long duration = System.currentTimeMillis() - startTime;
              if (duration > slowLogThreshold) {
                _PERF_LOG.debug("SLOW LOG: {} took {} ms",stellarStatement,duration);
              }
            }
            _LOG.debug("{} := {} yields {}", kv.getKey(), stellarStatement , o);
            if (o != null && o instanceof Map) {
              mapEntries.add(kv.getKey());
            }
            if(o == null) {
              message.remove(kv.getKey());
              ret.remove(kv.getKey());
            }
            else {
              message.put(kv.getKey(), o);
              ret.put(kv.getKey(), o);
            }
          }
        }
      }
      /*
      We need to handle the map entries separately now.
      We want to explode them out, so if "var" is
      {
        "foo" : "bar"
      }
      then we want "var.foo" == "bar"
      and no "var"
       */
      for(String mapEntry : mapEntries) {
        String key = mapEntry;
        Map<Object, Object> value = (Map<Object, Object>) ret.get(key);
        if(value != null) {
          _LOG.debug("Exploding map: {} == {}", key, value);
          for (Map.Entry<Object, Object> valueKv : value.entrySet()) {
            String newKey = ((key.length() > 0) ? key + "." : "") + valueKv.getKey();
            ret.put(newKey, valueKv.getValue());
          }
          //removing the map from downstream
          ret.remove(key);
        }
      }
    }
    return ret;
  }