def validate_integration()

in detection_rules/rule_validators.py [0:0]


    def validate_integration(self, data: QueryRuleData, meta: RuleMeta,
                             package_integrations: List[dict]) -> Union[EQL_ERROR_TYPES, None, ValueError]:
        """Validate an EQL query while checking TOMLRule against integration schemas."""
        if meta.query_schema_validation is False or meta.maturity == "deprecated":
            # syntax only, which is done via self.ast
            return

        error_fields = {}
        package_schemas = {}

        # Initialize package_schemas with a nested structure
        for integration_data in package_integrations:
            package = integration_data["package"]
            integration = integration_data["integration"]
            if integration:
                package_schemas.setdefault(package, {}).setdefault(integration, {})
            else:
                package_schemas.setdefault(package, {})

        # Process each integration schema
        for integration_schema_data in get_integration_schema_data(
            data, meta, package_integrations
        ):
            ecs_version = integration_schema_data["ecs_version"]
            package, integration = (
                integration_schema_data["package"],
                integration_schema_data["integration"],
            )
            package_version = integration_schema_data["package_version"]
            integration_schema = integration_schema_data["schema"]
            stack_version = integration_schema_data["stack_version"]

            # add non-ecs-schema fields for edge cases not added to the integration
            if data.index_or_dataview:
                for index_name in data.index_or_dataview:
                    integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))

            # Add custom schema fields for appropriate stack version
            if data.index_or_dataview and CUSTOM_RULES_DIR:
                for index_name in data.index_or_dataview:
                    integration_schema.update(**ecs.flatten(ecs.get_custom_index_schema(index_name, stack_version)))

            # add endpoint schema fields for multi-line fields
            integration_schema.update(**ecs.flatten(ecs.get_endpoint_schemas()))
            package_schemas[package].update(**integration_schema)

            eql_schema = ecs.KqlSchema2Eql(integration_schema)
            err_trailer = (
                f"stack: {stack_version}, integration: {integration},"
                f"ecs: {ecs_version}, package: {package}, package_version: {package_version}"
            )

            # Validate the query against the schema
            exc = self.validate_query_with_schema(
                data=data,
                schema=eql_schema,
                err_trailer=err_trailer,
                min_stack_version=meta.min_stack_version,
            )

            if isinstance(exc, eql.EqlParseError):
                message = exc.error_msg
                if message == "Unknown field" or "Field not recognized" in message:
                    field = extract_error_field(self.query, exc)
                    trailer = (
                        f"\n\tTry adding event.module or event.dataset to specify integration module\n\t"
                        f"Will check against integrations {meta.integration} combined.\n\t"
                        f"{package=}, {integration=}, {package_version=}, "
                        f"{stack_version=}, {ecs_version=}"
                    )
                    error_fields[field] = {
                        "error": exc,
                        "trailer": trailer,
                        "package": package,
                        "integration": integration,
                    }
                    if data.get("notify", False):
                        print(
                            f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}"
                        )
                else:
                    return exc

        # Check error fields against schemas of different packages or different integrations
        for field, error_data in list(error_fields.items()):
            error_package, error_integration = (
                error_data["package"],
                error_data["integration"],
            )
            for package, integrations_or_schema in package_schemas.items():
                if error_integration is None:
                    # Compare against the schema directly if there's no integration
                    if error_package != package and field in integrations_or_schema:
                        del error_fields[field]
                else:
                    # Compare against integration schemas
                    for integration, schema in integrations_or_schema.items():
                        check_alt_schema = (
                            error_package != package or  # noqa: W504
                            (error_package == package and error_integration != integration)
                        )
                        if check_alt_schema and field in schema:
                            del error_fields[field]

        # raise the first error
        if error_fields:
            _, data = next(iter(error_fields.items()))
            exc = data["error"]
            return exc