scripts/schema/loader.py (193 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import copy
import git
import glob
from typing import (
Any,
Dict,
List,
Optional,
)
import yaml
from generators import ecs_helpers
from _types import (
Field,
FieldEntry,
FieldNestedEntry,
MultiField,
SchemaDetails,
)
# Loads main ECS schemas and optional additional schemas.
# They are deeply nested, then merged together.
# This script doesn't fill in defaults other than the bare minimum for a predictable
# deeply nested structure. It doesn't concern itself with what "should be allowed"
# in being a good ECS citizen. It just loads things and merges them together.
# The deeply nested structured returned by this script looks like this.
#
# [schema name]: {
# 'schema_details': {
# 'reusable': ...
# },
# 'field_details': {
# 'type': ...
# },
# 'fields': {
# [field name]: {
# 'field_details': { ... }
# 'fields': {
#
# (dotted key names replaced by deep nesting)
# [field name]: {
# 'field_details': { ... }
# 'fields': {
# }
# }
# }
# }
# }
# Schemas at the top level always have all 3 keys populated.
# Leaf fields only have 'field_details' populated.
# Any intermediate field with other fields nested within them have 'fields' populated.
# Note that intermediate fields rarely have 'field_details' populated, but it's supported.
# Examples of this are 'dns.answers', 'observer.egress'.
EXPERIMENTAL_SCHEMA_DIR = 'experimental/schemas'
def load_schemas(
ref: Optional[str] = None,
included_files: Optional[List[str]] = []
) -> Dict[str, FieldEntry]:
"""Loads ECS and custom schemas. They are returned deeply nested and merged."""
# ECS fields (from git ref or not)
schema_files_raw: Dict[str, FieldNestedEntry] = load_schemas_from_git(
ref) if ref else load_schema_files(ecs_helpers.ecs_files())
fields: Dict[str, FieldEntry] = deep_nesting_representation(schema_files_raw)
# Custom additional files
if included_files and len(included_files) > 0:
print('Loading user defined schemas: {0}'.format(included_files))
# If --ref provided and --include loading experimental schemas
if ref and EXPERIMENTAL_SCHEMA_DIR in included_files:
exp_schema_files_raw: Dict[str, FieldNestedEntry] = load_schemas_from_git(
ref, target_dir=EXPERIMENTAL_SCHEMA_DIR)
exp_fields: Dict[str, FieldEntry] = deep_nesting_representation(exp_schema_files_raw)
fields = merge_fields(fields, exp_fields)
included_files.remove(EXPERIMENTAL_SCHEMA_DIR)
# Remaining additional custom files (never from git ref)
custom_files: List[str] = ecs_helpers.glob_yaml_files(included_files)
custom_fields: Dict[str, FieldEntry] = deep_nesting_representation(load_schema_files(custom_files))
fields = merge_fields(fields, custom_fields)
return fields
def load_schema_files(files: List[str]) -> Dict[str, FieldNestedEntry]:
fields_nested: Dict[str, FieldNestedEntry] = {}
for f in files:
new_fields: Dict[str, FieldNestedEntry] = read_schema_file(f)
fields_nested = ecs_helpers.safe_merge_dicts(fields_nested, new_fields)
return fields_nested
def load_schemas_from_git(
ref: str,
target_dir: Optional[str] = 'schemas'
) -> Dict[str, FieldNestedEntry]:
tree: git.objects.tree.Tree = ecs_helpers.get_tree_by_ref(ref)
fields_nested: Dict[str, FieldNestedEntry] = {}
# Handles case if target dir doesn't exists in git ref
if ecs_helpers.path_exists_in_git_tree(tree, target_dir):
for blob in tree[target_dir].blobs:
if blob.name.endswith('.yml'):
new_fields: Dict[str, FieldNestedEntry] = read_schema_blob(blob, ref)
fields_nested = ecs_helpers.safe_merge_dicts(fields_nested, new_fields)
else:
raise KeyError(f"Target directory './{target_dir}' not present in git ref '{ref}'!")
return fields_nested
def read_schema_file(file_name: str) -> Dict[str, FieldNestedEntry]:
"""Read a raw schema yml file into a dict."""
with open(file_name) as f:
raw: List[FieldNestedEntry] = yaml.safe_load(f.read())
return nest_schema(raw, file_name)
def read_schema_blob(
blob: git.objects.blob.Blob,
ref: str
) -> Dict[str, FieldNestedEntry]:
"""Read a raw schema yml git blob into a dict."""
content: str = blob.data_stream.read().decode('utf-8')
raw: List[FieldNestedEntry] = yaml.safe_load(content)
file_name: str = "{} (git ref {})".format(blob.name, ref)
return nest_schema(raw, file_name)
def nest_schema(raw: List[FieldNestedEntry], file_name: str) -> Dict[str, FieldNestedEntry]:
"""
Raw schema files are an array of schema details: [{'name': 'base', ...}]
This function loops over the array (usually 1 schema per file) and turns it into
a dict with the schema name as the key: { 'base': { 'name': 'base', ...}}
"""
fields: Dict[str, FieldNestedEntry] = {}
for schema in raw:
if 'name' not in schema:
raise ValueError("Schema file {} is missing mandatory attribute 'name'".format(file_name))
fields[schema['name']] = schema
return fields
def deep_nesting_representation(fields: Dict[str, FieldNestedEntry]) -> Dict[str, FieldEntry]:
deeply_nested: Dict[str, FieldEntry] = {}
for (name, flat_schema) in fields.items():
# We destructively select what goes into schema_details and child fields.
# The rest is 'field_details'.
flat_schema = flat_schema.copy()
flat_schema['node_name'] = flat_schema['name']
# Schema-only details. Not present on other nested field groups.
schema_details: SchemaDetails = {}
for schema_key in ['root', 'group', 'reusable', 'title']:
if schema_key in flat_schema:
schema_details[schema_key] = flat_schema.pop(schema_key)
nested_schema = nest_fields(flat_schema.pop('fields', []))
# Re-assemble new structure
deeply_nested[name] = {
'schema_details': schema_details,
# What's still in flat_schema is the field_details for the field set itself
'field_details': flat_schema,
'fields': nested_schema['fields']
}
return deeply_nested
def nest_fields(field_array: List[Field]) -> Dict[str, Dict[str, FieldEntry]]:
schema_root: Dict[str, Dict[str, FieldEntry]] = {'fields': {}}
for field in field_array:
nested_levels: List[str] = field['name'].split('.')
parent_fields: List[str] = nested_levels[:-1]
leaf_field: str = nested_levels[-1]
# "nested_schema" is a cursor we move within the schema_root structure we're building.
# Here we reset the cursor for this new field.
nested_schema = schema_root['fields']
current_path = []
for idx, level in enumerate(parent_fields):
nested_schema.setdefault(level, {})
# Where nested fields will live
nested_schema[level].setdefault('fields', {})
# Make type:object explicit for intermediate parent fields
nested_schema[level].setdefault('field_details', {})
field_details = nested_schema[level]['field_details']
field_details['node_name'] = level
# Respect explicitly defined object fields
if 'type' in field_details and field_details['type'] in ['object', 'nested']:
field_details.setdefault('intermediate', False)
else:
field_details.setdefault('type', 'object')
field_details.setdefault('name', '.'.join(parent_fields[:idx + 1]))
field_details.setdefault('intermediate', True)
# moving the nested_schema cursor deeper
current_path.extend([level])
nested_schema = nested_schema[level]['fields']
nested_schema.setdefault(leaf_field, {})
# Overwrite 'name' with the leaf field's name. The flat_name is already computed.
field['node_name'] = leaf_field
nested_schema[leaf_field]['field_details'] = field
return schema_root
def array_of_maps_to_map(array_vals: List[MultiField]) -> Dict[str, MultiField]:
ret_map: Dict[str, MultiField] = {}
for map_val in array_vals:
name: str = map_val['name']
# if multiple name fields exist in the same custom definition this will take the last one
ret_map[name] = map_val
return ret_map
def map_of_maps_to_array(map_vals: Dict[str, MultiField]) -> List[MultiField]:
ret_list: List[MultiField] = []
for key in map_vals:
ret_list.append(map_vals[key])
return sorted(ret_list, key=lambda k: k['name'])
def dedup_and_merge_lists(list_a: List[MultiField], list_b: List[MultiField]) -> List[MultiField]:
list_a_map: Dict[str, MultiField] = array_of_maps_to_map(list_a)
list_a_map.update(array_of_maps_to_map(list_b))
return map_of_maps_to_array(list_a_map)
def merge_fields(a: Dict[str, FieldEntry], b: Dict[str, FieldEntry]) -> Dict[str, FieldEntry]:
"""Merge ECS field sets with custom field sets."""
a = copy.deepcopy(a)
b = copy.deepcopy(b)
for key in b:
if key not in a:
a[key] = b[key]
continue
# merge field details
if 'normalize' in b[key]['field_details']:
a[key].setdefault('field_details', {})
a[key]['field_details'].setdefault('normalize', [])
a[key]['field_details']['normalize'].extend(b[key]['field_details'].pop('normalize'))
if 'multi_fields' in b[key]['field_details']:
a[key].setdefault('field_details', {})
a[key]['field_details'].setdefault('multi_fields', [])
a[key]['field_details']['multi_fields'] = dedup_and_merge_lists(
a[key]['field_details']['multi_fields'], b[key]['field_details']['multi_fields'])
# if we don't do this then the update call below will overwrite a's field_details, with the original
# contents of b, which undoes our merging the multi_fields
del b[key]['field_details']['multi_fields']
a[key]['field_details'].update(b[key]['field_details'])
# merge schema details
if 'schema_details' in b[key]:
asd = a[key]['schema_details']
bsd = b[key]['schema_details']
if 'reusable' in b[key]['schema_details']:
asd.setdefault('reusable', {})
if 'top_level' in bsd['reusable']:
asd['reusable']['top_level'] = bsd['reusable']['top_level']
else:
asd['reusable'].setdefault('top_level', True)
if 'order' in bsd['reusable']:
asd['reusable']['order'] = bsd['reusable']['order']
asd['reusable'].setdefault('expected', [])
asd['reusable']['expected'].extend(bsd['reusable']['expected'])
bsd.pop('reusable')
asd.update(bsd)
# merge nested fields
if 'fields' in b[key]:
a[key].setdefault('fields', {})
a[key]['fields'] = merge_fields(a[key]['fields'], b[key]['fields'])
return a
def load_yaml_file(file_name):
with open(file_name) as f:
return yaml.safe_load(f.read())
# You know, for silent tests
def warn(message: str) -> None:
print(message)
def eval_globs(globs):
"""Accepts an array of glob patterns or file names, returns the array of actual files"""
all_files = []
for g in globs:
if g.endswith('/'):
g += '*'
new_files = glob.glob(g)
if len(new_files) == 0:
warn("{} did not match any files".format(g))
else:
all_files.extend(new_files)
return all_files
def load_definitions(file_globs):
sets = []
for f in ecs_helpers.glob_yaml_files(file_globs):
raw = load_yaml_file(f)
sets.append(raw)
return sets