def get_eql_schema()

in detection_rules/ecs.py [0:0]


def get_eql_schema(version=None, index_patterns=None):
    """Return schema in expected format for eql."""
    schema = get_schema(version, name='ecs_flat')
    str_types = ('text', 'ip', 'keyword', 'date', 'object', 'geo_point')
    num_types = ('float', 'integer', 'long')
    schema = schema.copy()

    def convert_type(t):
        return 'string' if t in str_types else 'number' if t in num_types else 'boolean'

    converted = {}

    for field, schema_info in schema.items():
        field_type = schema_info.get('type', '')
        add_field(converted, field, convert_type(field_type))

    # add non-ecs schema
    if index_patterns:
        for index_name in index_patterns:
            for k, v in flatten(get_index_schema(index_name)).items():
                add_field(converted, k, convert_type(v))

    # add custom schema
    if index_patterns and CUSTOM_RULES_DIR:
        for index_name in index_patterns:
            for k, v in flatten(get_custom_index_schema(index_name)).items():
                add_field(converted, k, convert_type(v))

    # add endpoint custom schema
    for k, v in flatten(get_endpoint_schemas()).items():
        add_field(converted, k, convert_type(v))

    return converted