def get_all_flattened_schema()

in detection_rules/ecs.py [0:0]


def get_all_flattened_schema() -> dict:
    """Load all schemas into a flattened dictionary."""
    all_flattened_schema = {}
    for _, schema in get_non_ecs_schema().items():
        all_flattened_schema.update(flatten(schema))

    ecs_schemas = get_schemas()
    for version in ecs_schemas:
        for index, info in ecs_schemas[version]["ecs_flat"].items():
            all_flattened_schema.update({index: info["type"]})

    for _, integration_schema in load_integrations_schemas().items():
        for index, index_schema in integration_schema.items():
            # Detect if ML integration
            if "jobs" in index_schema:
                ml_schemas = {k: v for k, v in index_schema.items() if k != "jobs"}
                for _, ml_schema in ml_schemas.items():
                    all_flattened_schema.update(flatten(ml_schema))
            else:
                all_flattened_schema.update(flatten(index_schema))

    return all_flattened_schema