scripts/schema/subset_filter.py (139 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import copy import os from typing import ( Any, Dict, List, Optional, Tuple, ) from generators import intermediate_files from schema import ( cleaner, loader, ) from _types import ( FieldEntry ) # This script takes all ECS and custom fields already loaded, and lets users # filter out the ones they don't need. def filter( fields: Dict[str, FieldEntry], subset_file_globs: List[str], out_dir: str ) -> Tuple[Dict[str, FieldEntry], Dict[str, FieldEntry]]: subsets: List[Dict[str, Any]] = load_subset_definitions(subset_file_globs) for subset in subsets: subfields: Dict[str, FieldEntry] = extract_matching_fields(fields, subset['fields']) intermediate_files.generate(subfields, os.path.join(out_dir, 'ecs', 'subset', subset['name']), False) merged_subset: Dict[str, Any] = combine_all_subsets(subsets) if merged_subset: fields = extract_matching_fields(fields, merged_subset) # Looks for the `docs_only` attribute, which generates a second field subset # to pass to the ascii_doc generator # After second subset is generated, `docs_only: True` fields are removed # from the `fields` subset docs_only_field_paths = generate_docs_only_paths(merged_subset) if docs_only_field_paths: docs_only_subset = generate_docs_only_subset(docs_only_field_paths) docs_only_fields = extract_matching_fields(fields, docs_only_subset) fields = remove_docs_only_entries(docs_only_field_paths, fields) else: docs_only_fields = {} return fields, docs_only_fields def generate_docs_only_subset(paths: List[str]) -> Dict[str, Any]: """ Takes paths list of `docs_only` fields and generates a subset """ docs_only_subset = {} for path in paths: # split and reverse split_path = path.split('.')[::-1] current_obj = docs_only_subset while len(split_path) > 1: temp_path = split_path.pop() if not current_obj.get(temp_path): current_obj[temp_path] = {'fields': {}} current_obj = current_obj[temp_path]['fields'] current_obj[split_path[-1]] = {} return docs_only_subset def generate_docs_only_paths( subset: Dict[str, Any], filtered: Optional[Dict[str, Any]] = {}, parent: Optional[str] = '', path: Optional[str] = '', paths: Optional[List[str]] = [], ) -> List[str]: """ Returns a list of field paths: ['process.same_as_process'] for subset fields marked as `docs_only: True` """ for current in subset: if subset[current].get('docs_only'): path += f'.{current}' paths.append(path) if 'fields' in subset[current] and isinstance(subset[current]['fields'], dict): if not parent: path_name = current else: path_name = f'{parent}.{current}' generate_docs_only_paths(subset[current]['fields'], filtered=filtered, parent=current, path=path_name, paths=paths ) return paths def remove_docs_only_entries(paths: List[str], fields: Dict[str, FieldEntry]) -> Dict[str, FieldEntry]: """ Removed each path in paths list from the fields object """ for path in paths: split_path = path.split('.') field_set = split_path[0] field = split_path[1] del (fields[field_set]['fields'][field]) return fields def combine_all_subsets(subsets: Dict[str, Any]) -> Dict[str, Any]: """Merges N subsets into one. Strips top level 'name' and 'fields' keys as well as non-ECS field options since we can't know how to merge those.""" merged_subset = {} for subset in subsets: strip_non_ecs_options(subset['fields']) merge_subsets(merged_subset, subset['fields']) return merged_subset def load_subset_definitions(file_globs: List[str]) -> List[Dict[str, Any]]: if not file_globs: return [] subsets: List[Dict[str, Any]] = loader.load_definitions(file_globs) if not subsets: raise ValueError('--subset specified, but no subsets found in {}'.format(file_globs)) return subsets ecs_options: List[str] = ['fields', 'enabled', 'index', 'docs_only'] def strip_non_ecs_options(subset: Dict[str, Any]) -> None: for key in subset: subset[key] = {x: subset[key][x] for x in subset[key] if x in ecs_options} if 'fields' in subset[key] and isinstance(subset[key]['fields'], dict): strip_non_ecs_options(subset[key]['fields']) def merge_subsets(a: Dict[str, Any], b: Dict[str, Any]) -> None: """Merges field subset definitions together. The b subset is merged into the a subset. Assumes that subsets have been stripped of non-ecs options.""" for key in b: if key not in a: a[key] = b[key] elif 'fields' in a[key] and 'fields' in b[key]: if b[key]['fields'] == '*': a[key]['fields'] = '*' elif isinstance(a[key]['fields'], dict) and isinstance(b[key]['fields'], dict): merge_subsets(a[key]['fields'], b[key]['fields']) elif 'fields' in a[key] or 'fields' in b[key]: raise ValueError("Subsets unmergeable: 'fields' found in key '{}' in only one subset".format(key)) # If both subsets have enabled set to False, this will leave enabled: False in the merged subset # Otherwise, enabled is removed and is implicitly true if a[key].get('enabled', True) or b[key].get('enabled', True): a[key].pop('enabled', None) # Same logic from 'enabled' applies to 'index' if a[key].get('index', True) or b[key].get('index', True): a[key].pop('index', None) def extract_matching_fields( fields: Dict[str, FieldEntry], subset_definitions: Dict[str, Any] ) -> Dict[str, FieldEntry]: """Removes fields that are not in the subset definition. Returns a copy without modifying the input fields dict.""" retained_fields: Dict[str, FieldEntry] = {x: fields[x].copy() for x in subset_definitions} for key, val in subset_definitions.items(): retained_fields[key]['field_details'] = fields[key]['field_details'].copy() for option in val: if option != 'fields': if 'intermediate' in retained_fields[key]['field_details']: retained_fields[key]['field_details']['intermediate'] = False retained_fields[key]['field_details'].setdefault( 'description', 'Intermediate field included by adding option with subset') retained_fields[key]['field_details']['level'] = 'custom' cleaner.field_cleanup(retained_fields[key]) retained_fields[key]['field_details'][option] = val[option] # If the field in the schema has a 'fields' key, we expect a 'fields' key in the subset if 'fields' in fields[key]: if 'fields' not in val: raise ValueError("'fields' key expected, not found in subset for {}".format(key)) elif isinstance(val['fields'], dict): retained_fields[key]['fields'] = extract_matching_fields(fields[key]['fields'], val['fields']) elif val['fields'] != "*": raise ValueError("Unexpected value '{}' found in 'fields' key".format(val['fields'])) # If the field in the schema does not have a 'fields' key, there should not be a 'fields' key in the subset elif 'fields' in val: raise ValueError("'fields' key not expected, found in subset for {}".format(key)) return retained_fields