def get_required_fields()

in detection_rules/rule.py [0:0]


    def get_required_fields(self, index: str) -> List[Optional[dict]]:
        """Retrieves fields needed for the query along with type information from the schema."""
        if isinstance(self, ESQLValidator):
            return []

        current_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
        ecs_version = get_stack_schemas()[str(current_version)]['ecs']
        beats_version = get_stack_schemas()[str(current_version)]['beats']
        endgame_version = get_stack_schemas()[str(current_version)]['endgame']
        ecs_schema = ecs.get_schema(ecs_version)

        beat_types, beat_schema, schema = self.get_beats_schema(index or [], beats_version, ecs_version)
        endgame_schema = self.get_endgame_schema(index or [], endgame_version)

        # construct integration schemas
        packages_manifest = load_integrations_manifests()
        integrations_schemas = load_integrations_schemas()
        datasets, _ = beats.get_datasets_and_modules(self.ast)
        package_integrations = parse_datasets(datasets, packages_manifest)
        int_schema = {}
        data = {"notify": False}

        for pk_int in package_integrations:
            package = pk_int["package"]
            integration = pk_int["integration"]
            schema, _ = get_integration_schema_fields(integrations_schemas, package, integration,
                                                      current_version, packages_manifest, {}, data)
            int_schema.update(schema)

        required = []
        unique_fields = self.unique_fields or []

        for fld in unique_fields:
            field_type = ecs_schema.get(fld, {}).get('type')
            is_ecs = field_type is not None

            if not is_ecs:
                if int_schema:
                    field_type = int_schema.get(fld, None)
                elif beat_schema:
                    field_type = beat_schema.get(fld, {}).get('type')
                elif endgame_schema:
                    field_type = endgame_schema.endgame_schema.get(fld, None)

            required.append(dict(name=fld, type=field_type or 'unknown', ecs=is_ecs))

        return sorted(required, key=lambda f: f['name'])