in detection_rules/rule.py [0:0]
def get_required_fields(self, index: str) -> List[Optional[dict]]:
"""Retrieves fields needed for the query along with type information from the schema."""
if isinstance(self, ESQLValidator):
return []
current_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
ecs_version = get_stack_schemas()[str(current_version)]['ecs']
beats_version = get_stack_schemas()[str(current_version)]['beats']
endgame_version = get_stack_schemas()[str(current_version)]['endgame']
ecs_schema = ecs.get_schema(ecs_version)
beat_types, beat_schema, schema = self.get_beats_schema(index or [], beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(index or [], endgame_version)
# construct integration schemas
packages_manifest = load_integrations_manifests()
integrations_schemas = load_integrations_schemas()
datasets, _ = beats.get_datasets_and_modules(self.ast)
package_integrations = parse_datasets(datasets, packages_manifest)
int_schema = {}
data = {"notify": False}
for pk_int in package_integrations:
package = pk_int["package"]
integration = pk_int["integration"]
schema, _ = get_integration_schema_fields(integrations_schemas, package, integration,
current_version, packages_manifest, {}, data)
int_schema.update(schema)
required = []
unique_fields = self.unique_fields or []
for fld in unique_fields:
field_type = ecs_schema.get(fld, {}).get('type')
is_ecs = field_type is not None
if not is_ecs:
if int_schema:
field_type = int_schema.get(fld, None)
elif beat_schema:
field_type = beat_schema.get(fld, {}).get('type')
elif endgame_schema:
field_type = endgame_schema.endgame_schema.get(fld, None)
required.append(dict(name=fld, type=field_type or 'unknown', ecs=is_ecs))
return sorted(required, key=lambda f: f['name'])