in detection_rules/config.py [0:0]
def parse_rules_config(path: Optional[Path] = None) -> RulesConfig:
"""Parse the _config.yaml file for default or custom rules."""
if path:
assert path.exists(), f'rules config file does not exist: {path}'
loaded = yaml.safe_load(path.read_text())
elif CUSTOM_RULES_DIR:
path = Path(CUSTOM_RULES_DIR) / '_config.yaml'
if not path.exists():
raise FileNotFoundError(
"""
Configuration file not found.
Please create a configuration file. You can use the 'custom-rules setup-config' command
and update the 'CUSTOM_RULES_DIR' environment variable as needed.
"""
)
loaded = yaml.safe_load(path.read_text())
else:
path = Path(get_etc_path('_config.yaml'))
loaded = load_etc_dump('_config.yaml')
try:
ConfigFile.from_dict(loaded)
except KeyError as e:
raise SystemExit(f'Missing key `{str(e)}` in _config.yaml file.')
except (AttributeError, TypeError):
raise SystemExit(f'No data properly loaded from {path}')
except ValueError as e:
raise SystemExit(e)
base_dir = path.resolve().parent
# testing
# precedence to the environment variable
# environment variable is absolute path and config file is relative to the _config.yaml file
test_config_ev = os.getenv('DETECTION_RULES_TEST_CONFIG', None)
if test_config_ev:
test_config_path = Path(test_config_ev)
else:
test_config_file = loaded.get('testing', {}).get('config')
if test_config_file:
test_config_path = base_dir.joinpath(test_config_file)
else:
test_config_path = None
if test_config_path:
test_config_data = yaml.safe_load(test_config_path.read_text())
# overwrite None with empty list to allow implicit exemption of all tests with `test_only` defined to None in
# test config
if 'unit_tests' in test_config_data and test_config_data['unit_tests'] is not None:
test_config_data['unit_tests'] = {k: v or [] for k, v in test_config_data['unit_tests'].items()}
test_config = TestConfig.from_dict(test_file=test_config_path, **test_config_data)
else:
test_config = TestConfig.from_dict()
# files
# paths are relative
files = {f'{k}_file': base_dir.joinpath(v) for k, v in loaded['files'].items()}
contents = {k: load_dump(str(base_dir.joinpath(v).resolve())) for k, v in loaded['files'].items()}
contents.update(**files)
# directories
# paths are relative
if loaded.get('directories'):
contents.update({k: base_dir.joinpath(v).resolve() for k, v in loaded['directories'].items()})
# rule_dirs
# paths are relative
contents['rule_dirs'] = [base_dir.joinpath(d).resolve() for d in loaded.get('rule_dirs')]
# directories
# paths are relative
if loaded.get('directories'):
directories = loaded.get('directories')
if directories.get('exception_dir'):
contents['exception_dir'] = base_dir.joinpath(directories.get('exception_dir')).resolve()
if directories.get('action_dir'):
contents['action_dir'] = base_dir.joinpath(directories.get('action_dir')).resolve()
if directories.get('action_connector_dir'):
contents['action_connector_dir'] = base_dir.joinpath(directories.get('action_connector_dir')).resolve()
# version strategy
contents['bypass_version_lock'] = loaded.get('bypass_version_lock', False)
# bbr_rules_dirs
# paths are relative
if loaded.get('bbr_rules_dirs'):
contents['bbr_rules_dirs'] = [base_dir.joinpath(d).resolve() for d in loaded.get('bbr_rules_dirs', [])]
# kql keyword normalization
contents['normalize_kql_keywords'] = loaded.get('normalize_kql_keywords', True)
if loaded.get('auto_gen_schema_file'):
contents['auto_gen_schema_file'] = base_dir.joinpath(loaded['auto_gen_schema_file'])
# Check if the file exists
if not contents['auto_gen_schema_file'].exists():
# If the file doesn't exist, create the necessary directories and file
contents['auto_gen_schema_file'].parent.mkdir(parents=True, exist_ok=True)
contents['auto_gen_schema_file'].write_text('{}')
# bypass_optional_elastic_validation
contents['bypass_optional_elastic_validation'] = loaded.get('bypass_optional_elastic_validation', False)
if contents['bypass_optional_elastic_validation']:
set_all_validation_bypass(contents['bypass_optional_elastic_validation'])
# no_tactic_filename
contents['no_tactic_filename'] = loaded.get('no_tactic_filename', False)
# return the config
try:
rules_config = RulesConfig(test_config=test_config, **contents)
except (ValueError, TypeError) as e:
raise SystemExit(f'Error parsing packages.yaml: {str(e)}')
return rules_config