in detection_rules/rule_validators.py [0:0]
def validate_integration(self, data: QueryRuleData, meta: RuleMeta,
package_integrations: List[dict]) -> Union[EQL_ERROR_TYPES, None, ValueError]:
"""Validate an EQL query while checking TOMLRule against integration schemas."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
error_fields = {}
package_schemas = {}
# Initialize package_schemas with a nested structure
for integration_data in package_integrations:
package = integration_data["package"]
integration = integration_data["integration"]
if integration:
package_schemas.setdefault(package, {}).setdefault(integration, {})
else:
package_schemas.setdefault(package, {})
# Process each integration schema
for integration_schema_data in get_integration_schema_data(
data, meta, package_integrations
):
ecs_version = integration_schema_data["ecs_version"]
package, integration = (
integration_schema_data["package"],
integration_schema_data["integration"],
)
package_version = integration_schema_data["package_version"]
integration_schema = integration_schema_data["schema"]
stack_version = integration_schema_data["stack_version"]
# add non-ecs-schema fields for edge cases not added to the integration
if data.index_or_dataview:
for index_name in data.index_or_dataview:
integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
# Add custom schema fields for appropriate stack version
if data.index_or_dataview and CUSTOM_RULES_DIR:
for index_name in data.index_or_dataview:
integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))
# add endpoint schema fields for multi-line fields
integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
package_schemas[package].update(**integration_schema)
eql_schema = ecs.KqlSchema2Eql(integration_schema)
err_trailer = (
f"stack: {stack_version}, integration: {integration},"
f"ecs: {ecs_version}, package: {package}, package_version: {package_version}"
)
# Validate the query against the schema
exc = self.validate_query_with_schema(
data=data,
schema=eql_schema,
err_trailer=err_trailer,
min_stack_version=meta.min_stack_version,
)
if isinstance(exc, eql.EqlParseError):
message = exc.error_msg
if message == "Unknown field" or "Field not recognized" in message:
field = extract_error_field(self.query, exc)
trailer = (
f"\n\tTry adding event.module or event.dataset to specify integration module\n\t"
f"Will check against integrations {meta.integration} combined.\n\t"
f"{package=}, {integration=}, {package_version=}, "
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {
"error": exc,
"trailer": trailer,
"package": package,
"integration": integration,
}
if data.get("notify", False):
print(
f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}"
)
else:
return exc
# Check error fields against schemas of different packages or different integrations
for field, error_data in list(error_fields.items()):
error_package, error_integration = (
error_data["package"],
error_data["integration"],
)
for package, integrations_or_schema in package_schemas.items():
if error_integration is None:
# Compare against the schema directly if there's no integration
if error_package != package and field in integrations_or_schema:
del error_fields[field]
else:
# Compare against integration schemas
for integration, schema in integrations_or_schema.items():
check_alt_schema = (
error_package != package or # noqa: W504
(error_package == package and error_integration != integration)
)
if check_alt_schema and field in schema:
del error_fields[field]
# raise the first error
if error_fields:
_, data = next(iter(error_fields.items()))
exc = data["error"]
return exc