def kibana_import_rules()

in detection_rules/kbwrap.py [0:0]


def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Optional[bool] = False,
                        overwrite_exceptions: Optional[bool] = False,
                        overwrite_action_connectors: Optional[bool] = False) -> (dict, List[RuleResource]):
    """Import custom rules into Kibana."""
    def _handle_response_errors(response: dict):
        """Handle errors from the import response."""
        def _parse_list_id(s: str):
            """Parse the list ID from the error message."""
            match = re.search(r'list_id: "(.*?)"', s)
            return match.group(1) if match else None

        # Re-try to address known Kibana issue: https://github.com/elastic/kibana/issues/143864
        workaround_errors = []
        workaround_error_types = set()

        flattened_exceptions = [e for sublist in exception_dicts for e in sublist]
        all_exception_list_ids = {exception["list_id"] for exception in flattened_exceptions}

        click.echo(f'{len(response["errors"])} rule(s) failed to import!')

        action_connector_validation_error = "Error validating create data"
        action_connector_type_error = "expected value of type [string] but got [undefined]"
        for error in response['errors']:
            error_message = error["error"]["message"]
            click.echo(f' - {error["rule_id"]}: ({error["error"]["status_code"]}) {error_message}')

            if "references a non existent exception list" in error_message:
                list_id = _parse_list_id(error_message)
                if list_id in all_exception_list_ids:
                    workaround_errors.append(error["rule_id"])
                    workaround_error_types.add("non existent exception list")

            if action_connector_validation_error in error_message and action_connector_type_error in error_message:
                workaround_error_types.add("connector still being built")

        if workaround_errors:
            workaround_errors = list(set(workaround_errors))
            if "non existent exception list" in workaround_error_types:
                click.echo(
                    f"Missing exception list errors detected for {len(workaround_errors)} rules. "
                    "Try re-importing using the following command and rule IDs:\n"
                )
                click.echo("python -m detection_rules kibana import-rules -o ", nl=False)
                click.echo(" ".join(f"-id {rule_id}" for rule_id in workaround_errors))
                click.echo()
            if "connector still being built" in workaround_error_types:
                click.echo(
                    f"Connector still being built errors detected for {len(workaround_errors)} rules. "
                    "Please try re-importing the rules again."
                )
                click.echo()

    def _process_imported_items(imported_items_list, item_type_description, item_key):
        """Displays appropriately formatted success message that all items imported successfully."""
        all_ids = {item[item_key] for sublist in imported_items_list for item in sublist}
        if all_ids:
            click.echo(f'{len(all_ids)} {item_type_description} successfully imported')
            ids_str = '\n - '.join(all_ids)
            click.echo(f' - {ids_str}')

    kibana = ctx.obj['kibana']
    rule_dicts = [r.contents.to_api_format() for r in rules]
    with kibana:
        cl = GenericCollection.default()
        exception_dicts = [
            d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)
        ]
        action_connectors_dicts = [
            d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents)
        ]
        response, successful_rule_ids, results = RuleResource.import_rules(
            rule_dicts,
            exception_dicts,
            action_connectors_dicts,
            overwrite=overwrite,
            overwrite_exceptions=overwrite_exceptions,
            overwrite_action_connectors=overwrite_action_connectors
        )

    if successful_rule_ids:
        click.echo(f'{len(successful_rule_ids)} rule(s) successfully imported')
        rule_str = '\n - '.join(successful_rule_ids)
        click.echo(f' - {rule_str}')
    if response['errors']:
        _handle_response_errors(response)
    else:
        _process_imported_items(exception_dicts, 'exception list(s)', 'list_id')
        _process_imported_items(action_connectors_dicts, 'action connector(s)', 'id')

    return response, results