scripts/generators/es_template.py (250 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import json import sys from typing import ( Dict, List, Optional, Union ) from os.path import join from generators import ecs_helpers from _types import ( Field, FieldNestedEntry, ) # Composable Template def generate( ecs_nested: Dict[str, FieldNestedEntry], ecs_version: str, out_dir: str, mapping_settings_file: str, template_settings_file: str ) -> None: """This generates all artifacts for the composable template approach""" all_component_templates(ecs_nested, ecs_version, out_dir) component_names = component_name_convention(ecs_version, ecs_nested) save_composable_template(ecs_version, component_names, out_dir, mapping_settings_file, template_settings_file) def save_composable_template(ecs_version, component_names, out_dir, mapping_settings_file, template_settings_file): mappings_section = mapping_settings(mapping_settings_file) template = template_settings(ecs_version, mappings_section, template_settings_file, component_names=component_names) filename = join(out_dir, "elasticsearch/composable/template.json") save_json(filename, template) def all_component_templates( ecs_nested: Dict[str, FieldNestedEntry], ecs_version: str, out_dir: str ) -> None: """Generate one component template per field set""" component_dir: str = join(out_dir, 'elasticsearch/composable/component') ecs_helpers.make_dirs(component_dir) for (fieldset_name, fieldset) in ecs_helpers.remove_top_level_reusable_false(ecs_nested).items(): field_mappings = {} for (flat_name, field) in fieldset['fields'].items(): name_parts = flat_name.split('.') dict_add_nested(field_mappings, name_parts, entry_for(field)) save_component_template(fieldset_name, field['level'], ecs_version, component_dir, field_mappings) def save_component_template( template_name: str, field_level: str, ecs_version: str, out_dir: str, field_mappings: Dict ) -> None: filename: str = join(out_dir, template_name) + ".json" reference_url: str = "https://www.elastic.co/guide/en/ecs/current/ecs-{}.html".format(template_name) template: Dict = { 'template': {'mappings': {'properties': field_mappings}}, '_meta': { 'ecs_version': ecs_version, } } """Only generate a documentation link for ECS fields""" if (field_level != 'custom'): template['_meta']['documentation'] = reference_url save_json(filename, template) def component_name_convention( ecs_version: str, ecs_nested: Dict[str, FieldNestedEntry] ) -> List[str]: version: str = ecs_version.replace('+', '-') names: List[str] = [] for (fieldset_name, fieldset) in ecs_helpers.remove_top_level_reusable_false(ecs_nested).items(): names.append("ecs_{}_{}".format(version, fieldset_name.lower())) return names # Legacy template def generate_legacy( ecs_flat: Dict[str, Field], ecs_version: str, out_dir: str, mapping_settings_file: str, template_settings_file: str ) -> None: """Generate the legacy index template""" field_mappings = {} for flat_name in sorted(ecs_flat): field = ecs_flat[flat_name] name_parts = flat_name.split('.') dict_add_nested(field_mappings, name_parts, entry_for(field)) mappings_section: Dict = mapping_settings(mapping_settings_file) mappings_section['properties'] = field_mappings generate_legacy_template_version(ecs_version, mappings_section, out_dir, template_settings_file) def generate_legacy_template_version( ecs_version: str, mappings_section: Dict, out_dir: str, template_settings_file: str ) -> None: ecs_helpers.make_dirs(join(out_dir, 'elasticsearch', "legacy")) template: Dict = template_settings(ecs_version, mappings_section, template_settings_file, is_legacy=True) filename: str = join(out_dir, "elasticsearch/legacy/template.json") save_json(filename, template) # Common helpers def dict_add_nested( dct: Dict, name_parts: List[str], value: Dict ) -> None: current_nesting: str = name_parts[0] rest_name_parts: List[str] = name_parts[1:] if len(rest_name_parts) > 0: dct.setdefault(current_nesting, {}) dct[current_nesting].setdefault('properties', {}) dict_add_nested( dct[current_nesting]['properties'], rest_name_parts, value) else: if current_nesting in dct and 'type' in value and 'object' == value['type']: return dct[current_nesting] = value def entry_for(field: Field) -> Dict: field_entry: Dict = {'type': field['type']} try: if field['type'] == 'object' or field['type'] == 'nested': if 'enabled' in field and not field['enabled']: ecs_helpers.dict_copy_existing_keys(field, field_entry, ['enabled']) # the index field is only valid for field types that are not object and nested elif 'index' in field and not field['index']: ecs_helpers.dict_copy_existing_keys(field, field_entry, ['index', 'doc_values']) if field['type'] == 'keyword' or field['type'] == 'flattened': ecs_helpers.dict_copy_existing_keys(field, field_entry, ['ignore_above', 'synthetic_source_keep']) elif field['type'] == 'constant_keyword': ecs_helpers.dict_copy_existing_keys(field, field_entry, ['value']) elif field['type'] == 'text': ecs_helpers.dict_copy_existing_keys(field, field_entry, ['norms']) elif field['type'] == 'alias': ecs_helpers.dict_copy_existing_keys(field, field_entry, ['path']) elif field['type'] == 'scaled_float': ecs_helpers.dict_copy_existing_keys(field, field_entry, ['scaling_factor']) if 'multi_fields' in field: field_entry['fields'] = {} for mf in field['multi_fields']: mf_type = mf['type'] mf_entry = {'type': mf_type} if mf_type == 'keyword': ecs_helpers.dict_copy_existing_keys(mf, mf_entry, ['normalizer', 'ignore_above']) elif mf_type == 'text': ecs_helpers.dict_copy_existing_keys(mf, mf_entry, ['norms', 'analyzer']) if 'parameters' in mf: mf_entry.update(mf['parameters']) field_entry['fields'][mf['name']] = mf_entry if 'parameters' in field: field_entry.update(field['parameters']) except KeyError as ex: print("Exception {} occurred for field {}".format(ex, field)) raise ex return field_entry def mapping_settings(mapping_settings_file: str) -> Dict: if mapping_settings_file: with open(mapping_settings_file) as f: mappings = json.load(f) else: mappings = default_mapping_settings() return mappings def template_settings( ecs_version: str, mappings_section: Dict, template_settings_file: Union[str, None], is_legacy: Optional[bool] = False, component_names: Optional[List[str]] = None ) -> Dict: if template_settings_file: with open(template_settings_file) as f: template = json.load(f) else: if is_legacy: template = default_legacy_template_settings(ecs_version) else: template = default_template_settings(ecs_version) finalize_template(template, ecs_version, is_legacy, mappings_section, component_names) return template def finalize_template( template: Dict, ecs_version: str, is_legacy: bool, mappings_section: Dict, component_names: List[str] ) -> None: if is_legacy: if mappings_section: template['mappings'] = mappings_section # _meta can't be at template root in legacy templates, so moving back to mappings section # if present if '_meta' in template: mappings_section['_meta'] = template.pop('_meta') else: template['template']['mappings'] = mappings_section template['composed_of'] = component_names template['_meta'] = { "ecs_version": ecs_version, "description": "Sample composable template that includes all ECS fields" } def save_json(file: str, data: Dict) -> None: open_mode = "wb" if sys.version_info >= (3, 0): open_mode = "w" with open(file, open_mode) as jsonfile: json.dump(data, jsonfile, indent=2, sort_keys=True) jsonfile.write('\n') def default_template_settings(ecs_version: str) -> Dict: return { "index_patterns": ["try-ecs-*"], "_meta": { "ecs_version": ecs_version, "description": "Sample composable template that includes all ECS fields" }, "priority": 1, # Very low, as this is a sample template "template": { "settings": { "index": { "codec": "best_compression", "mapping": { "total_fields": { "limit": 2000 } } } }, } } def default_legacy_template_settings(ecs_version: str) -> Dict: return { "index_patterns": ["try-ecs-*"], "_meta": {"version": ecs_version}, "order": 1, "settings": { "index": { "mapping": { "total_fields": { "limit": 10000 } }, "refresh_interval": "5s" } } } def default_mapping_settings() -> Dict: return { "date_detection": False, "dynamic_templates": [ { "strings_as_keyword": { "mapping": { "ignore_above": 1024, "type": "keyword" }, "match_mapping_type": "string" } } ] }