in detection_rules/ecs.py [0:0]
def get_all_flattened_schema() -> dict:
"""Load all schemas into a flattened dictionary."""
all_flattened_schema = {}
for _, schema in get_non_ecs_schema().items():
all_flattened_schema.update(flatten(schema))
ecs_schemas = get_schemas()
for version in ecs_schemas:
for index, info in ecs_schemas[version]["ecs_flat"].items():
all_flattened_schema.update({index: info["type"]})
for _, integration_schema in load_integrations_schemas().items():
for index, index_schema in integration_schema.items():
# Detect if ML integration
if "jobs" in index_schema:
ml_schemas = {k: v for k, v in index_schema.items() if k != "jobs"}
for _, ml_schema in ml_schemas.items():
all_flattened_schema.update(flatten(ml_schema))
else:
all_flattened_schema.update(flatten(index_schema))
return all_flattened_schema