public List parse()

in metron-platform/metron-parsing/metron-parsers/src/main/java/org/apache/metron/parsers/bro/BasicBroParser.java [58:148]


  public List<JSONObject> parse(byte[] msg) {

    _LOG.trace("[Metron] Starting to parse incoming message");

    String rawMessage = null;
    List<JSONObject> messages = new ArrayList<>();
    try {
      rawMessage = new String(msg, getReadCharset());
      _LOG.trace("[Metron] Received message: {}", rawMessage);

      JSONObject cleanedMessage = cleaner.clean(rawMessage);
      _LOG.debug("[Metron] Cleaned message: {}", cleanedMessage);

      if (cleanedMessage == null || cleanedMessage.isEmpty()) {
        throw new Exception("Unable to clean message: " + rawMessage);
      }

      String key;
      JSONObject payload;
      if (cleanedMessage.containsKey("type")) {
        key = cleanedMessage.get("type").toString();
        payload = cleanedMessage;
      } else {
        key = cleanedMessage.keySet().iterator().next().toString();

        if (key == null) {
          throw new Exception("Unable to retrieve key for message: "
                  + rawMessage);
        }

        payload = (JSONObject) cleanedMessage.get(key);
      }

      if (payload == null) {
        throw new Exception("Unable to retrieve payload for message: "
                + rawMessage);
      }

      String originalString = key.toUpperCase() + " |";
      for (Object k : payload.keySet()) {
        Object raw = payload.get(k);
        String value = raw.toString();
        if (raw instanceof Double) {
          value = DECIMAL_FORMAT.get().format(raw);
        }
        originalString += " " + k.toString() + ":" + value;
      }
      payload.put("original_string", originalString);

      replaceKey(payload, Constants.Fields.TIMESTAMP.getName(), new String[]{ "ts" });

      long timestamp = 0L;
      if (payload.containsKey(Constants.Fields.TIMESTAMP.getName())) {
        try {
          Double broTimestamp = ((Number) payload.get(Constants.Fields.TIMESTAMP.getName())).doubleValue();
          String broTimestampFormatted = DECIMAL_FORMAT.get().format(broTimestamp);
          timestamp = convertToMillis(broTimestamp);
          payload.put(Constants.Fields.TIMESTAMP.getName(), timestamp);
          payload.put("bro_timestamp", broTimestampFormatted);
          _LOG.trace("[Metron] new bro record - timestamp : {}", () -> payload.get(Constants.Fields.TIMESTAMP.getName()));
        } catch (NumberFormatException nfe) {
          _LOG.error("[Metron] timestamp is invalid: {}", payload.get("timestamp"));
          payload.put(Constants.Fields.TIMESTAMP.getName(), 0);
        }
      }

      boolean ipSrcReplaced = replaceKey(payload, Constants.Fields.SRC_ADDR.getName(), new String[]{"source_ip", "id.orig_h"});
      if (!ipSrcReplaced) {
        replaceKeyArray(payload, Constants.Fields.SRC_ADDR.getName(), new String[]{ "tx_hosts" });
      }

      boolean ipDstReplaced = replaceKey(payload, Constants.Fields.DST_ADDR.getName(), new String[]{"dest_ip", "id.resp_h"});
      if (!ipDstReplaced) {
        replaceKeyArray(payload, Constants.Fields.DST_ADDR.getName(), new String[]{ "rx_hosts" });
      }

      replaceKey(payload, Constants.Fields.SRC_PORT.getName(), new String[]{"source_port", "id.orig_p"});
      replaceKey(payload, Constants.Fields.DST_PORT.getName(), new String[]{"dest_port", "id.resp_p"});

      payload.put(Constants.Fields.PROTOCOL.getName(), key);
      _LOG.debug("[Metron] Returning parsed message: {}", payload);
      messages.add(payload);
      return messages;

    } catch (Exception e) {
      String message = "Unable to parse Message: " + rawMessage;
      _LOG.error(message, e);
      throw new IllegalStateException(message, e);
    }

  }