in aristotle/aristotle.py [0:0]
def _pfmod_apply(self, pfmod_file, sids):
""" Applies the directives in the pfmod YAML file to passed in SIDs
:param pfmod_file: filename of FPMod file
:type pfmod_file: string, required
:param sids: list of sids to scope to
:type sids: list, required
:returns: list of matching SIDs
:rtype: list
"""
# see docs
valid_actions_str = ["disable", "enable"]
valid_actions_dict = ["add_metadata",
"add_metadata_exclusive",
"delete_metadata",
"regex_sub",
"copy_key",
# set_<keyword> actions not listed here (see below)
]
# valid_actions = valid_actions_str + valid_actions_dict
# Supported rule keywords (and defined type) that can be updated by PFMod, e.g. using "set_priority"
valid_set_keywords = {'sid': {'type': 'int', 'default': 5000000},
'gid': {'type': 'int', 'default': 1},
'rev': {'type': 'int', 'default': 2},
'priority': {'type': 'int', 'default': 3},
'msg': {'type': 'str', 'default': None},
'reference': {'type': 'str_noquote', 'default': None},
'classtype': {'type': 'str_noquote', 'default': None},
'target': {'type': 'str_noquote', 'default': None},
'threshold': {'type': 'str_noquote', 'default': None},
'flow': {'type': 'str_noquote', 'default': None}
}
keyword_re_template = r"(?P<PRE>[\x28\x3B]\s*{}\s*\x3A\s*)(?P<VALUE>[^\x3B]+)\x3B"
matched_sids_all = set()
print_debug("pfmod_apply() called")
try:
if not os.path.isfile(pfmod_file):
print_error("Problem processing PFMod file '{}': file not found.".format(pfmod_file), fatal=True)
with open(pfmod_file, 'r') as fh:
pfmod_rules = yaml.safe_load(fh)
except Exception as e:
print_error("Unable to open PFMod YAML file '{}': {}".format(pfmod_file, e), fatal=True)
if not type(pfmod_rules) == dict:
print_error("Unexpected YAML format in file '{}'. Cannot continue.".format(pfmod_file), fatal=True)
if "include" in pfmod_rules.keys() and pfmod_rules['include'] is not None:
# Note: allowing for include directives creates a directed graph but checking is not done
# to ensure it is acyclic. We could check to ensure this is a DAG but for now, it's the
# responsibility of the user.
for f in pfmod_rules['include']:
if not os.path.isabs(f):
f = os.path.join(os.path.dirname(pfmod_file), f)
matched_sids_all.update(self._pfmod_apply(f, sids))
elif "rules" not in pfmod_rules.keys():
print_error("No 'rules' directives defined in file '{}'.".format(pfmod_file), fatal=True)
if "version" in pfmod_rules.keys() and pfmod_rules['version'] is not None:
print_debug("Processing PFMod rules file '{}', version {}.".format(os.path.basename(pfmod_file), pfmod_rules['version']))
if "rules" in pfmod_rules.keys() and pfmod_rules['rules'] is not None:
for rule in pfmod_rules['rules']:
rule_name = "<undefined>"
if "name" in rule.keys():
rule_name = rule['name']
print_debug("Processing PFMod rule '{}'".format(rule_name))
for k in ["filter_string", "actions"]:
if k not in rule.keys():
print_error("No '{}' defined for PFMod rule '{}'".format(k, rule_name), fatal=True)
# print_debug("Filter String: {}".format(rule['filter_string']))
try:
matched_sids = self.filter_ruleset(rule['filter_string'])
except Exception as e:
print_error("Unable to apply filter string '{}' in PFMod rule named '{}': {}.".format(rule['filter_string'], rule_name, e), fatal=True)
# print_debug("matched_sids: {}\npassed sids: {}".format(matched_sids, sids))
matched_sids = list(set(sids) & set(matched_sids))
matched_sids_all.update(matched_sids)
# print_debug("Matched sids: {}".format(matched_sids))
print_debug("Rule:\n\t{}\n\tModified: {}".format(rule_name, len(matched_sids)))
for sid in matched_sids:
for action in rule['actions']:
if type(action) == str:
if action not in valid_actions_str:
print_error("Invalid action '{}' in PFMod rule named '{}'. Supported str actions are: {}.".format(action, rule_name, valid_actions_str))
continue
if action == 'disable':
self.metadata_dict[sid]['disabled'] = True
elif action == 'enable':
self.metadata_dict[sid]['disabled'] = False
else:
print_error("Action not implemented: '{}'.".format(action))
continue
elif type(action) == dict:
for action_key in action.keys():
action_key = action_key.strip()
if action_key not in valid_actions_dict and not action_key.startswith("set_"):
print_error("Invalid action found: '{}' in PFMod rule named '{}'. Supported dict actions are: '{}'.".format(action, rule_name, valid_actions_dict))
continue
if len(str(action[action_key]).strip()) == 0:
print_error("No value for action '{}'.".format(action_key), fatal=True)
if action_key == "copy_key":
a = [k.strip().lower() for k in action[action_key].split(' ') if len(k.strip()) > 0]
if len(a) != 2:
print_error("Invalid value for action '{}' in PFMod rule '{}'. "
"Expected 2 arguments but received {}".format(action_key, rule_name, len(a)))
else:
key = a[0]
new_key = a[1]
if key == new_key:
print_error("Invalid action found: '{}' in PFMod rule named '{}'. Old and new keyword names are both '{}'. "
" You are doing it wrong.".format(action_key, rule_name, key))
continue
# check if key exists
if key not in self.metadata_dict[sid]['metadata'].keys():
print_warning("PFMod rule named '{}': metadata key '{}' not found in SID {}. "
"Unable to perform action '{}'.".format(rule_name, key, sid, action_key))
continue
# check if new key exists and leave it alone if it does
if new_key in self.metadata_dict[sid]['metadata'].keys():
print_warning("PFMod rule named '{}': destination metadata key '{}' already exists. "
"Not overwriting with action '{}'.".format(rule_name, new_key, action_key))
continue
# copy key to new key
for v in self.metadata_dict[sid]['metadata'][key]:
self.add_metadata(sid, new_key, v)
elif action_key.startswith("set_") and len(action_key.split('_')) > 2:
# set arbitrary integer-based metadata key if key name has an underscore in it; support relative values and default value.
key = action_key.split('_', 1)[1]
print_debug("PFMod: setting '{}' metadata key on SID {} ...".format(key, sid))
key_value = str(action[action_key]).strip()
if len(key_value) > 1:
if key_value[0] in ['+', '-']:
key_value_orig = key_value
r = [x.strip() for x in key_value.split(',')]
default_value = None
if len(r) > 1:
# default given
default_value = r[1]
try:
default_value = int(default_value)
except Exception as e:
print_error("PFMod rule named '{}', action '{}': invalid default value '{}' (must be an integer).\n{}".format(
rule_name, action_key, default_value, e), fatal=True)
try:
key_value = int(r[0])
except Exception as e:
print_error("PFMod rule named '{}', action '{}': invalid default '{}' (must be an integer).\n{}".format(
rule_name, action_key, key_value, e), fatal=True)
# get existing value
if key in self.metadata_dict[sid]['metadata'].keys() and len(self.metadata_dict[sid]['metadata'][key]) > 0:
# grab the first value (if there are multiple entries)
existing_value = self.metadata_dict[sid]['metadata'][key][0]
try:
existing_value = int(existing_value)
except Exception as e:
print_error("PFMod rule named '{}', action '{}': invalid exiting metadata value '{}' for key '{}' in SID '{}' (must be an integer). "
"Skipping. Error:\n{}".format(rule_name, action_key, existing_value, key, sid, e))
continue
key_value = existing_value + key_value
else:
if default_value is not None:
# use default
print_debug("PFMod rule named '{}': metadata key '{}' not found in SID {}. "
"Using default value '{}'.".format(rule_name, key, sid, default_value))
key_value = default_value
else:
print_warning("PFMod rule named '{}': metadata key '{}' not found in SID {}. "
"Unable to add relative value '{}'.".format(rule_name, key, sid, key_value_orig))
continue
try:
key_value = int(key_value)
except Exception as e:
print_error("PFMod rule named '{}', action '{}': invalid value '{}' (must be an integer).\n{}".format(rule_name, action_key, key_value, e), fatal=True)
# effectivly make this "add_metadata_exclusive"
self.delete_metadata(sid, key)
self.add_metadata(sid, key, str(key_value))
else:
print_error("PFMod rule named '{}': Invalid value for action '{}'.".format(rule_name, action_key), fatal=True)
elif action_key == "delete_metadata":
a = [k.strip().lower() for k in action[action_key].split(' ', 1) if len(k.strip()) > 0]
if len(a) < 2:
key = a[0]
print_debug("Deleting all metadata for key '{}'.".format(key))
self.delete_metadata(sid, key)
else:
key = a[0]
value = a[1]
print_debug("Deleting all metadata with key-value pair '{} {}'.".format(key, value))
self.delete_metadata(sid, key, value)
elif action_key.startswith("add_metadata"):
a = [k.strip().lower() for k in action[action_key].split(' ', 1) if len(k.strip()) > 0]
if len(a) != 2:
print_error("Invalid value for action '{}' in PFMod rule '{}'.".format(action_key, rule_name))
else:
key = a[0]
value = a[1]
if action_key.endswith("exclusive"):
self.delete_metadata(sid, key)
self.add_metadata(sid, key, value)
elif action_key.startswith("set_"):
keyword = action_key.split('_')[1]
if keyword not in valid_set_keywords.keys():
print_error("Invalid PFMod action '{}'. Setting keyword '{}' not supported.".format(action_key, keyword))
continue
print_debug("PFMod: setting '{}' keyword on SID {} ...".format(keyword, sid))
keyword_value = str(action[action_key]).strip()
if valid_set_keywords[keyword]['type'] == 'int':
# 'int' keywords support leading '+' or '-' which will adjust the existing value of that keyword
# up (for '+') or down (for '-'). YAML must quote value with leading '+' or it will be treated as
# integer and the '+' won't be kept to be parsed here.
if len(keyword_value) > 1 and keyword_value[0] in ['+', '-']:
keyword_value_orig = keyword_value
try:
keyword_value = int(keyword_value)
# extract value
keyword_re = re.compile(keyword_re_template.format(keyword))
matchobj = keyword_re.search(self.metadata_dict[sid]['raw_rule'])
if not matchobj:
print_warning("PFMod rule named '{}': keyword '{}' not found in SID {}. "
"Unable to add relative value '{}'.".format(rule_name, keyword, sid, keyword_value_orig))
continue
else:
rule_value = int(matchobj.group("VALUE"))
keyword_value = rule_value + keyword_value
if (keyword in ["sid", "priority", "rev"] and keyword_value <= 0) or (keyword in ["gid"] and keyword_value < 0):
keyword_value = 1
if keyword in ["gid"]:
keyword_value = 0
print_warning("PFMod rule named '{}': keyword '{}' relative adjustment of value by {} results in a value below what is allowed for SID {};"
" setting to minimum value of '{}'.".format(rule_name, keyword, keyword_value_orig, sid, keyword_value))
except Exception as e:
print_error("Invalid value '{}' for keyword '{}' in PFMod rule named '{}': {}".format(keyword_value_orig, keyword, rule_name, e), fatal=True)
# validate as int
try:
keyword_value = int(keyword_value)
if keyword in ["sid", "priority", "rev"] and keyword_value <= 0:
# note: 'priority' on Suricata should be 1-255
raise ValueError()
# can add other validation checks here as necessary but ultimately, the responsibility for proper syntax
# falls on the PFMod rule author.
except Exception as e:
print_error("Invalid value '{}' for keyword '{}' in PFMod rule named '{}': {}".format(keyword_value, keyword, rule_name, e), fatal=True)
else:
# validate as string
badchars = ['"', '\\', ';']
try:
for c in badchars:
# Rough filter and cursory rule injection prevention.
# Technically these chars could be included in some contexts when properly escaped.
if c in keyword_value:
raise ValueError("Character '{}' not supported in value for PFMod action '{}'.".format(c, action_key))
if (keyword == "target" and keyword_value not in ['src_ip', 'dest_ip']):
raise ValueError()
# can add other validation checks here as necessary but ultimately, the responsibility for proper syntax
# falls on the PFMod rule author.
if valid_set_keywords[keyword]['type'] == "str":
# keyword value needs to be double quoted in the rule string
keyword_value = '"{}"'.format(keyword_value)
except Exception as e:
print_error("Invalid value '{}' for keyword '{}' in PFMod rule named '{}': {}".format(keyword_value, keyword, rule_name, e), fatal=True)
# update rule
keyword_re = re.compile(keyword_re_template.format(keyword))
if keyword_re.search(self.metadata_dict[sid]['raw_rule']):
print_debug("PFMod: Overwriting keyword '{}' with value '{}' for SID {}.".format(keyword, keyword_value, sid))
self.metadata_dict[sid]['raw_rule'] = keyword_re.sub(r'\g<PRE>' + str(keyword_value) + ';', self.metadata_dict[sid]['raw_rule'])
else:
# given keyword not in original rule; add one.
print_debug("PFMod: Adding keyword '{}' with value '{}' for SID {}.".format(keyword, keyword_value, sid))
keyword_string = " {}:{};)".format(keyword, keyword_value)
self.metadata_dict[sid]['raw_rule'] = eol_re.sub(keyword_string, self.metadata_dict[sid]['raw_rule'])
elif action_key == "regex_sub":
v = action[action_key]
re_flag = 0
re_v = v
if v.endswith('i'):
re_flag = re.I
re_v = v[:-1]
try:
search_string, replace_string = re_v.strip().strip('/').split('/', 1)
pattern_re = re.compile(r"{}".format(search_string), flags=re_flag)
self.metadata_dict[sid]['raw_rule'] = pattern_re.sub(r'{}'.format(replace_string), self.metadata_dict[sid]['raw_rule'])
except Exception as e:
print_error("Problem processing '{}' value '{}' in PFMod rule named '{}': {}".format(action_key, v, rule_name, e))
continue
else:
# not reached
print_error("Invalid action found: '{}' in PFMod rule named '{}'. Supported dict actions are: '{}'.".format(action, rule_name, valid_actions_dict), fatal=True)
# print_debug("Handled '{}' Action: '{}'. Value: '{}'".format(action_key, action, action[action_key]))
else:
print_error("Invalid action data type '{}' in PFMod rule named '{}'.".format(type(action), rule_name))
continue
return matched_sids_all