in detection_rules/cli_utils.py [0:0]
def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbose=False,
additional_required: Optional[list] = None, skip_errors: bool = False, strip_none_values=True, **kwargs,
) -> TOMLRule:
"""Prompt loop to build a rule."""
from .misc import schema_prompt
additional_required = additional_required or []
creation_date = datetime.date.today().strftime("%Y/%m/%d")
if verbose and path:
click.echo(f'[+] Building rule for {path}')
kwargs = copy.deepcopy(kwargs)
rule_name = kwargs.get('name')
if 'rule' in kwargs and 'metadata' in kwargs:
kwargs.update(kwargs.pop('metadata'))
kwargs.update(kwargs.pop('rule'))
rule_type = rule_type or kwargs.get('type') or \
click.prompt('Rule type', type=click.Choice(typing.get_args(definitions.RuleType)))
target_data_subclass = TOMLRuleContents.get_data_subclass(rule_type)
schema = target_data_subclass.jsonschema()
props = schema['properties']
required_fields = schema.get('required', []) + additional_required
contents = {}
skipped = []
for name, options in props.items():
if name == 'index' and kwargs.get("type") == "esql":
continue
if name == 'type':
contents[name] = rule_type
continue
# these are set at package release time depending on the version strategy
if (name == 'version' or name == 'revision') and not BYPASS_VERSION_LOCK:
continue
if required_only and name not in required_fields:
continue
# build this from technique ID
if name == 'threat':
threat_map = []
if not skip_errors:
while click.confirm('add mitre tactic?'):
tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, is_required=True)
technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array',
is_required=False, enum=list(matrix[tactic])) or []
try:
threat_map.append(build_threat_map_entry(tactic, *technique_ids))
except KeyError as e:
click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True)
continue
except ValueError as e:
click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True)
continue
if len(threat_map) > 0:
contents[name] = threat_map
continue
if kwargs.get(name):
contents[name] = schema_prompt(name, value=kwargs.pop(name))
continue
if name == "new_terms":
# patch to allow new_term imports
result = {"field": "new_terms_fields"}
result["value"] = schema_prompt("new_terms_fields", value=kwargs.pop("new_terms_fields"))
history_window_start_value = kwargs.pop("history_window_start", None)
result["history_window_start"] = [
{
"field": "history_window_start",
"value": schema_prompt("history_window_start", value=history_window_start_value),
}
]
else:
if skip_errors:
# return missing information
return f"Rule: {kwargs["id"]}, Rule Name: {rule_name} is missing {name} information"
else:
result = schema_prompt(name, is_required=name in required_fields, **options.copy())
if result:
if name not in required_fields and result == options.get('default', ''):
skipped.append(name)
continue
contents[name] = result
# DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion
suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name']
path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve()
# Inherit maturity from the rule already exists
maturity = "development"
if path.exists():
rules = RuleCollection()
rules.load_file(path)
if rules:
maturity = rules.rules[0].contents.metadata.maturity
meta = {
"creation_date": creation_date,
"updated_date": creation_date,
"maturity": maturity,
}
try:
rule = TOMLRule(path=Path(path), contents=TOMLRuleContents.from_dict({'rule': contents, 'metadata': meta}))
except kql.KqlParseError as e:
if skip_errors:
return f"Rule: {kwargs['id']}, Rule Name: {rule_name} query failed to parse: {e.error_msg}"
if e.error_msg == 'Unknown field':
warning = ('If using a non-ECS field, you must update "ecs{}.non-ecs-schema.json" under `beats` or '
'`legacy-endgame` (Non-ECS fields should be used minimally).'.format(os.path.sep))
click.secho(e.args[0], fg='red', err=True)
click.secho(warning, fg='yellow', err=True)
click.pause()
# if failing due to a query, loop until resolved or terminated
while True:
try:
contents['query'] = click.edit(contents['query'], extension='.eql')
rule = TOMLRule(path=Path(path),
contents=TOMLRuleContents.from_dict({'rule': contents, 'metadata': meta}))
except kql.KqlParseError as e:
click.secho(e.args[0], fg='red', err=True)
click.pause()
if e.error_msg.startswith("Unknown field"):
# get the latest schema for schema errors
clear_caches()
ecs.get_kql_schema(indexes=contents.get("index", []))
continue
break
except Exception as e:
if skip_errors:
return f"Rule: {kwargs['id']}, Rule Name: {rule_name} failed: {e}"
raise e
if save:
rule.save_toml(strip_none_values=strip_none_values)
if skipped:
print('Did not set the following values because they are un-required when set to the default value')
print(' - {}'.format('\n - '.join(skipped)))
return rule