public EnrichmentResult apply()

in metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java [115:217]


  public EnrichmentResult apply( JSONObject message
                         , EnrichmentStrategies strategy
                         , SensorEnrichmentConfig config
                         , PerformanceLogger perfLog
                         ) throws ExecutionException, InterruptedException {
    if(message == null) {
      return null;
    }
    if(perfLog != null) {
      perfLog.mark("execute");
      if(perfLog.isDebugEnabled() && !cacheStats.isEmpty()) {
        CacheStats before =  cacheStats.get(strategy);
        CacheStats after = concurrencyContext.getCache().stats();
        if(before != null && after != null) {
          CacheStats delta = after.minus(before);
          perfLog.log("cache", delta.toString());
        }
        cacheStats.put(strategy, after);
      }
    }
    String sensorType = MessageUtils.getSensorType(message);
    message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
    // Split the message into individual tasks.
    //
    // A task will either correspond to an enrichment adapter or,
    // in the case of Stellar, a stellar subgroup.  The tasks will be grouped by enrichment type (the key of the
    //tasks map).  Each JSONObject will correspond to a unit of work.
    Map<String, List<JSONObject>> tasks = splitMessage( message
                                                      , strategy
                                                      , config
                                                      );
    message.put(getClass().getSimpleName().toLowerCase() + ".splitter.end.ts", "" + System.currentTimeMillis());
    message.put(getClass().getSimpleName().toLowerCase() + ".enrich.begin.ts", "" + System.currentTimeMillis());
    if(perfLog != null) {
      perfLog.mark("enrich");
    }
    List<CompletableFuture<JSONObject>> taskList = new ArrayList<>();
    List<Map.Entry<Object, Throwable>> errors = Collections.synchronizedList(new ArrayList<>());
    for(Map.Entry<String, List<JSONObject>> task : tasks.entrySet()) {
      //task is the list of enrichment tasks for the task.getKey() adapter
      EnrichmentAdapter<CacheKey> adapter = enrichmentsByType.get(task.getKey());
      if(adapter == null) {
        throw new IllegalStateException("Unable to find an adapter for " + task.getKey()
                + ", possible adapters are: " + Joiner.on(",").join(enrichmentsByType.keySet()));
      }
      message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
      for(JSONObject m : task.getValue()) {
        /* now for each unit of work (each of these only has one element in them)
         * the key is the field name and the value is value associated with that field.
         *
         * In the case of stellar enrichment, the field name is the subgroup name or empty string.
         * The value is the subset of the message needed for the enrichment.
         *
         * In the case of another enrichment (e.g. hbase), the field name is the field name being enriched.
         * The value is the corresponding value.
         */
        for(Object o : m.keySet()) {
          String field = (String) o;
          Object value = m.get(o);
          if(value == null) {
            message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
            continue;
          }
          CacheKey cacheKey = new CacheKey(field, value, config);
          String prefix = adapter.getOutputPrefix(cacheKey);
          Supplier<JSONObject> supplier = () -> {
            try {
              JSONObject ret = concurrencyContext.getCache().get(cacheKey, new EnrichmentCallable(cacheKey, adapter));
              if(ret == null) {
                ret = new JSONObject();
              }
              //each enrichment has their own unique prefix to use to adjust the keys for the enriched fields.
              JSONObject adjustedKeys = EnrichmentUtils
                  .adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix);
              adjustedKeys.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
              return adjustedKeys;
            } catch (Throwable e) {
              JSONObject errorMessage = new JSONObject();
              errorMessage.putAll(m);
              errorMessage.put(Constants.SENSOR_TYPE, sensorType );
              errors.add(new AbstractMap.SimpleEntry<>(errorMessage, new IllegalStateException(strategy + " error with " + task.getKey() + " failed: " + e.getMessage(), e)));
              return new JSONObject();
            }
          };
          //add the Future to the task list
          taskList.add(CompletableFuture.supplyAsync( supplier, ConcurrencyContext.getExecutor()));
        }
      }
    }
    if(taskList.isEmpty()) {
      message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
      return new EnrichmentResult(message, errors);
    }

    EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors);
    ret.getResult().put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
    if(perfLog != null) {
      String key = message.get(Constants.GUID) + "";
      perfLog.log("enrich", "key={}, elapsed time to enrich", key);
      perfLog.log("execute", "key={}, elapsed time to run execute", key);
    }
    return ret;
  }