public void execute()

in metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java [181:258]


  public void execute(Tuple tuple) {
    perfLog.mark("execute");
    String key = tuple.getStringByField("key");
    JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
    String subGroup = "";

    JSONObject enrichedMessage = new JSONObject();
    enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
    try {
      if (rawMessage == null || rawMessage.isEmpty())
        throw new Exception("Could not parse binary stream to JSON");
      if (key == null)
        throw new Exception("Key is not valid");
      String sourceType = null;
      if(rawMessage.containsKey(Constants.SENSOR_TYPE)) {
        sourceType = rawMessage.get(Constants.SENSOR_TYPE).toString();
      }
      else {
        throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString());
      }
      String prefix = null;
      for (Object o : rawMessage.keySet()) {
        String field = (String) o;
        Object value =  rawMessage.get(field);
        if (field.equals(Constants.SENSOR_TYPE)) {
          enrichedMessage.put(Constants.SENSOR_TYPE, value);
        } else {
          JSONObject enrichedField = new JSONObject();
          if (value != null) {
            SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
            if(config == null) {
              LOG.debug("Unable to find SensorEnrichmentConfig for sourceType: {}", sourceType);
              MetronError metronError = new MetronError()
                      .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
                      .withMessage("Unable to find SensorEnrichmentConfig for sourceType: " + sourceType)
                      .addRawMessage(rawMessage);
              StormErrorUtils.handleError(collector, metronError);
              continue;
            }
            config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext);
            CacheKey cacheKey= new CacheKey(field, value, config);
            try {
              adapter.logAccess(cacheKey);
              prefix = adapter.getOutputPrefix(cacheKey);
              subGroup = adapter.getStreamSubGroup(enrichmentType, field);

              perfLog.mark("enrich");
              enrichedField = cache.get(cacheKey);
              perfLog.log("enrich", "key={}, time to run enrichment type={}", key, enrichmentType);

              if (enrichedField == null)
                throw new Exception("[Metron] Could not enrich string: "
                        + value);
            }
            catch(Exception e) {
              LOG.error(e.getMessage(), e);
              MetronError metronError = new MetronError()
                      .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
                      .withThrowable(e)
                      .withErrorFields(new HashSet() {{ add(field); }})
                      .addRawMessage(rawMessage);
              StormErrorUtils.handleError(collector, metronError);
              continue;
            }
          }
          enrichedMessage = EnrichmentUtils.adjustKeys(enrichedMessage, enrichedField, field, prefix);
        }
      }

      enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
      if (!enrichedMessage.isEmpty()) {
        collector.emit(enrichmentType, new Values(key, enrichedMessage, subGroup));
      }
    } catch (Exception e) {
      handleError(key, rawMessage, subGroup, enrichedMessage, e);
    }
    perfLog.log("execute", "key={}, elapsed time to run execute", key);
  }