def rule_prompt()

in detection_rules/cli_utils.py [0:0]


def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbose=False,
                additional_required: Optional[list] = None, skip_errors: bool = False, strip_none_values=True, **kwargs,
                ) -> TOMLRule:
    """Prompt loop to build a rule."""
    from .misc import schema_prompt

    additional_required = additional_required or []
    creation_date = datetime.date.today().strftime("%Y/%m/%d")
    if verbose and path:
        click.echo(f'[+] Building rule for {path}')

    kwargs = copy.deepcopy(kwargs)

    rule_name = kwargs.get('name')

    if 'rule' in kwargs and 'metadata' in kwargs:
        kwargs.update(kwargs.pop('metadata'))
        kwargs.update(kwargs.pop('rule'))

    rule_type = rule_type or kwargs.get('type') or \
        click.prompt('Rule type', type=click.Choice(typing.get_args(definitions.RuleType)))

    target_data_subclass = TOMLRuleContents.get_data_subclass(rule_type)
    schema = target_data_subclass.jsonschema()
    props = schema['properties']
    required_fields = schema.get('required', []) + additional_required
    contents = {}
    skipped = []

    for name, options in props.items():

        if name == 'index' and kwargs.get("type") == "esql":
            continue

        if name == 'type':
            contents[name] = rule_type
            continue

        # these are set at package release time depending on the version strategy
        if (name == 'version' or name == 'revision') and not BYPASS_VERSION_LOCK:
            continue

        if required_only and name not in required_fields:
            continue

        # build this from technique ID
        if name == 'threat':
            threat_map = []
            if not skip_errors:
                while click.confirm('add mitre tactic?'):
                    tactic = schema_prompt('mitre tactic name', type='string', enum=tactics, is_required=True)
                    technique_ids = schema_prompt(f'technique or sub-technique IDs for {tactic}', type='array',
                                                  is_required=False, enum=list(matrix[tactic])) or []

                    try:
                        threat_map.append(build_threat_map_entry(tactic, *technique_ids))
                    except KeyError as e:
                        click.secho(f'Unknown ID: {e.args[0]} - entry not saved for: {tactic}', fg='red', err=True)
                        continue
                    except ValueError as e:
                        click.secho(f'{e} - entry not saved for: {tactic}', fg='red', err=True)
                        continue

            if len(threat_map) > 0:
                contents[name] = threat_map
            continue

        if kwargs.get(name):
            contents[name] = schema_prompt(name, value=kwargs.pop(name))
            continue

        if name == "new_terms":
            # patch to allow new_term imports
            result = {"field": "new_terms_fields"}
            new_terms_fields_value = schema_prompt("new_terms_fields", value=kwargs.pop("new_terms_fields", None))
            result["value"] = ensure_list_of_strings(new_terms_fields_value)
            history_window_start_value = kwargs.pop("history_window_start", None)
            result["history_window_start"] = [
                {
                    "field": "history_window_start",
                    "value": schema_prompt("history_window_start", value=history_window_start_value),
                }
            ]

        else:
            if skip_errors:
                # return missing information
                return f"Rule: {kwargs["id"]}, Rule Name: {rule_name} is missing {name} information"
            else:
                result = schema_prompt(name, is_required=name in required_fields, **options.copy())
        if result:
            if name not in required_fields and result == options.get('default', ''):
                skipped.append(name)
                continue

            contents[name] = result

    # DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion
    suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name']
    path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve()
    # Inherit maturity and optionally local dates from the rule if it already exists
    meta = {
        "creation_date": kwargs.get("creation_date") or creation_date,
        "updated_date": kwargs.get("updated_date") or creation_date,
        "maturity": "development" or kwargs.get("maturity"),
    }

    try:
        rule = TOMLRule(path=Path(path), contents=TOMLRuleContents.from_dict({'rule': contents, 'metadata': meta}))
    except kql.KqlParseError as e:
        if skip_errors:
            return f"Rule: {kwargs['id']}, Rule Name: {rule_name} query failed to parse: {e.error_msg}"
        if e.error_msg == 'Unknown field':
            warning = ('If using a non-ECS field, you must update "ecs{}.non-ecs-schema.json" under `beats` or '
                       '`legacy-endgame` (Non-ECS fields should be used minimally).'.format(os.path.sep))
            click.secho(e.args[0], fg='red', err=True)
            click.secho(warning, fg='yellow', err=True)
            click.pause()

        # if failing due to a query, loop until resolved or terminated
        while True:
            try:
                contents['query'] = click.edit(contents['query'], extension='.eql')
                rule = TOMLRule(path=Path(path),
                                contents=TOMLRuleContents.from_dict({'rule': contents, 'metadata': meta}))
            except kql.KqlParseError as e:
                click.secho(e.args[0], fg='red', err=True)
                click.pause()

                if e.error_msg.startswith("Unknown field"):
                    # get the latest schema for schema errors
                    clear_caches()
                    ecs.get_kql_schema(indexes=contents.get("index", []))
                continue

            break
    except Exception as e:
        if skip_errors:
            return f"Rule: {kwargs['id']}, Rule Name: {rule_name} failed: {e}"
        raise e

    if save:
        rule.save_toml(strip_none_values=strip_none_values)

    if skipped:
        print('Did not set the following values because they are un-required when set to the default value')
        print(' - {}'.format('\n - '.join(skipped)))

    return rule