scripts/generators/ecs_helpers.py (173 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import glob import os import yaml import git import pathlib from typing import ( Any, Dict, List, Optional, OrderedDict, Set, Union, ) import warnings from collections import OrderedDict from copy import deepcopy from _types import ( Field, FieldEntry, FieldNestedEntry, ) # Dictionary helpers def dict_copy_keys_ordered(dct: Field, copied_keys: List[str]) -> Field: ordered_dict = OrderedDict() for key in copied_keys: if key in dct: ordered_dict[key] = dct[key] return ordered_dict def dict_copy_existing_keys(source: Field, destination: Field, keys: List[str]) -> None: for key in keys: if key in source: destination[key] = source[key] def dict_sorted_by_keys(dct: FieldNestedEntry, sort_keys: List[str]) -> List[FieldNestedEntry]: if not isinstance(sort_keys, list): sort_keys = [sort_keys] tuples: List[List[Union[int, str, FieldNestedEntry]]] = [] for key in dct: nested = dct[key] sort_criteria = [] for sort_key in sort_keys: sort_criteria.append(nested[sort_key]) sort_criteria.append(nested) tuples.append(sort_criteria) return list(map(lambda t: t[-1], sorted(tuples))) def ordered_dict_insert( dct: Field, new_key: str, new_value: Union[str, bool], before_key: Optional[str] = None, after_key: Optional[str] = None ) -> None: output = OrderedDict() inserted: bool = False for key, value in dct.items(): if not inserted and before_key is not None and key == before_key: output[new_key] = new_value inserted = True output[key] = value if not inserted and after_key is not None and key == after_key: output[new_key] = new_value inserted = True if not inserted: output[new_key] = new_value dct.clear() for key, value in output.items(): dct[key] = value def safe_merge_dicts(a: Dict[Any, Any], b: Dict[Any, Any]) -> Dict[Any, Any]: """Merges two dictionaries into one. If duplicate keys are detected a ValueError is raised.""" c = deepcopy(a) for key in b: if key not in c: c[key] = b[key] else: raise ValueError('Duplicate key found when merging dictionaries: {0}'.format(key)) return c def fields_subset(subset, fields): retained_fields = {} allowed_options = ['fields'] for key, val in subset.items(): for option in val: if option not in allowed_options: raise ValueError('Unsupported option found in subset: {}'.format(option)) # A missing fields key is shorthand for including all subfields if 'fields' not in val or val['fields'] == '*': retained_fields[key] = fields[key] elif isinstance(val['fields'], dict): # Copy the full field over so we get all the options, then replace the 'fields' with the right subset retained_fields[key] = fields[key] retained_fields[key]['fields'] = fields_subset(val['fields'], fields[key]['fields']) return retained_fields def yaml_ordereddict(dumper, data): # YAML representation of an OrderedDict will be like a dictionary, but # respecting the order of the dictionary. # Almost sure it's unndecessary with Python 3. value = [] for item_key, item_value in data.items(): node_key = dumper.represent_data(item_key) node_value = dumper.represent_data(item_value) value.append((node_key, node_value)) return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', value) yaml.add_representer(OrderedDict, yaml_ordereddict) def dict_clean_string_values(dict: Dict[Any, Any]) -> None: """Remove superfluous spacing in all field values of a dict""" for key in dict: value = dict[key] if isinstance(value, str): dict[key] = value.strip() # File helpers YAML_EXT = {'yml', 'yaml'} def is_yaml(path: str) -> bool: """Returns True if path matches an element of the yaml extensions set""" return set(path.split('.')[1:]).intersection(YAML_EXT) != set() def safe_list(o: Union[str, List[str]]) -> List[str]: """converts o to a list if it isn't already a list""" if isinstance(o, list): return o else: return o.split(',') def glob_yaml_files(paths: List[str]) -> List[str]: """Accepts string, or list representing a path, wildcard or folder. Returns list of matched yaml files""" all_files: List[str] = [] for path in safe_list(paths): if is_yaml(path): all_files.extend(glob.glob(path)) else: for t in YAML_EXT: all_files.extend(glob.glob(os.path.join(path, '*.' + t))) return sorted(all_files) def get_tree_by_ref(ref: str) -> git.objects.tree.Tree: repo: git.repo.base.Repo = git.Repo(os.getcwd()) commit: git.objects.commit.Commit = repo.commit(ref) return commit.tree def path_exists_in_git_tree(tree: git.objects.tree.Tree, file_path: str) -> bool: try: _ = tree[file_path] except KeyError: return False return True def usage_doc_files() -> List[str]: usage_docs_dir: str = os.path.join(os.path.dirname(__file__), '../../docs/reference') usage_docs_path: pathlib.PosixPath = pathlib.Path(usage_docs_dir) if usage_docs_path.is_dir(): return [x.name for x in usage_docs_path.glob('ecs-*-usage.md') if x.is_file()] return [] def ecs_files() -> List[str]: """Return the schema file list to load""" schema_glob: str = os.path.join(os.path.dirname(__file__), '../../schemas/*.yml') return sorted(glob.glob(schema_glob)) def make_dirs(path: str) -> None: try: os.makedirs(path, exist_ok=True) except OSError as e: print('Unable to create output directory: {}'.format(e)) raise e def yaml_dump( filename: str, data: Dict[str, FieldNestedEntry], preamble: Optional[str] = None ) -> None: with open(filename, 'w') as outfile: if preamble: outfile.write(preamble) yaml.dump(data, outfile, default_flow_style=False) def yaml_load(filename: str) -> Set[str]: with open(filename) as f: return yaml.safe_load(f.read()) # List helpers def list_subtract(original: List[Any], subtracted: List[Any]) -> List[Any]: """Subtract two lists. original = subtracted""" return [item for item in original if item not in subtracted] def list_extract_keys(lst: List[Field], key_name: str) -> List[str]: """Returns an array of values for 'key_name', from a list of dictionaries""" acc = [] for d in lst: acc.append(d[key_name]) return acc # Helpers for the deeply nested fields structure def is_intermediate(field: FieldEntry) -> bool: """Encapsulates the check to see if a field is an intermediate field or a "real" field.""" return ('intermediate' in field['field_details'] and field['field_details']['intermediate']) def remove_top_level_reusable_false(ecs_nested: Dict[str, FieldNestedEntry]) -> Dict[str, FieldNestedEntry]: """Returns same structure as ecs_nested, but skips all field sets with reusable.top_level: False""" components: Dict[str, FieldNestedEntry] = {} for (fieldset_name, fieldset) in ecs_nested.items(): if fieldset.get('reusable', None): if not fieldset['reusable']['top_level']: continue components[fieldset_name] = fieldset return components # Warning helper def strict_warning(msg: str) -> None: """Call warnings.warn(msg) for operations that would throw an Exception if operating in `--strict` mode. Allows a custom message to be passed. :param msg: custom text which will be displayed with wrapped boilerplate for strict warning messages. """ warn_message: str = f"{msg}\n\nThis will cause an exception when running in strict mode.\nWarning check:" warnings.warn(warn_message, stacklevel=3)