in detection_rules/ecs.py [0:0]
def get_eql_schema(version=None, index_patterns=None):
"""Return schema in expected format for eql."""
schema = get_schema(version, name='ecs_flat')
str_types = ('text', 'ip', 'keyword', 'date', 'object', 'geo_point')
num_types = ('float', 'integer', 'long')
schema = schema.copy()
def convert_type(t):
return 'string' if t in str_types else 'number' if t in num_types else 'boolean'
converted = {}
for field, schema_info in schema.items():
field_type = schema_info.get('type', '')
add_field(converted, field, convert_type(field_type))
# add non-ecs schema
if index_patterns:
for index_name in index_patterns:
for k, v in flatten(get_index_schema(index_name)).items():
add_field(converted, k, convert_type(v))
# add custom schema
if index_patterns and CUSTOM_RULES_DIR:
for index_name in index_patterns:
for k, v in flatten(get_custom_index_schema(index_name)).items():
add_field(converted, k, convert_type(v))
# add endpoint custom schema
for k, v in flatten(get_endpoint_schemas()).items():
add_field(converted, k, convert_type(v))
return converted