scripts/schema/exclude_filter.py (72 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import (
Dict,
List,
)
from schema import loader
from _types import (
Field,
FieldEntry,
FieldNestedEntry,
)
# This script should be run downstream of the subset filters - it takes
# all ECS and custom fields already loaded by the latter and explicitly
# removes a subset, for example, to simulate impact of future removals
def exclude(fields: Dict[str, FieldEntry], exclude_file_globs: List[str]) -> Dict[str, FieldEntry]:
excludes: List[FieldNestedEntry] = load_exclude_definitions(exclude_file_globs)
if excludes:
fields = exclude_fields(fields, excludes)
return fields
def long_path(path_as_list: List[str]) -> str:
return '.'.join([e for e in path_as_list])
def pop_field(
fields: Dict[str, FieldEntry],
node_path: List[str],
path: List[str],
removed: List[str]
) -> str:
"""pops a field from yaml derived dict using path derived from ordered list of nodes"""
if node_path[0] in fields:
if len(node_path) == 1:
flat_name: str = long_path(path)
fields.pop(node_path[0])
return flat_name
else:
inner_field: str = node_path.pop(0)
if 'fields' in fields[inner_field]:
popped: str = pop_field(fields[inner_field]['fields'], node_path, path, removed)
# if object field with no remaining fields and not 'base', pop it
if fields[inner_field]['fields'] == {} and inner_field != 'base':
fields.pop(inner_field)
return popped
else:
raise ValueError(
'--exclude specified, but no path to field {} found'.format(long_path(path)))
else:
this_long_path: str = long_path(path)
# Check in case already removed parent
if not any([this_long_path.startswith(long_path) for long_path in removed if long_path != None]):
raise ValueError('--exclude specified, but no field {} found'.format(this_long_path))
def exclude_trace_path(
fields: Dict[str, FieldEntry],
item: List[Field],
path: List[str],
removed: List[str]
) -> None:
"""traverses paths to one or more nodes in a yaml derived dict"""
for list_item in item:
node_path: List[str] = path.copy()
# cater for name.with.dots
for name in list_item['name'].split('.'):
node_path.append(name)
if not 'fields' in list_item:
parent: str = node_path[0]
removed.append(pop_field(fields, node_path, node_path.copy(), removed))
# if parent field has no remaining fields and not 'base', pop it
if parent != 'base' and parent in fields and len(fields[parent]['fields']) == 0:
fields.pop(parent)
else:
raise ValueError('--exclude specified, can\'t parse fields in file {}'.format(item))
def exclude_fields(fields: Dict[str, FieldEntry], excludes: List[FieldNestedEntry]) -> Dict[str, FieldEntry]:
"""Traverses fields and eliminates any field which matches the excludes"""
if excludes:
for ex_list in excludes:
for item in ex_list:
exclude_trace_path(fields, item['fields'], [item['name']], [])
return fields
def load_exclude_definitions(file_globs: List[str]) -> List[FieldNestedEntry]:
if not file_globs:
return []
excludes: List[FieldNestedEntry] = loader.load_definitions(file_globs)
if not excludes:
raise ValueError('--exclude specified, but no exclusions found in {}'.format(file_globs))
return excludes