scripts/generators/csv_generator.py (67 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import _csv import csv import sys from typing import ( Dict, List, ) from os.path import join from generator import ecs_helpers from _types import ( Field, ) def generate(ecs_flat: Dict[str, Field], version: str, out_dir: str) -> None: ecs_helpers.make_dirs(join(out_dir, 'csv')) sorted_fields = base_first(ecs_flat) save_csv(join(out_dir, 'csv/fields.csv'), sorted_fields, version) def base_first(ecs_flat: Dict[str, Field]) -> List[Field]: base_list: List[Field] = [] sorted_list: List[Field] = [] for field_name in sorted(ecs_flat): if '.' in field_name: sorted_list.append(ecs_flat[field_name]) else: base_list.append(ecs_flat[field_name]) return base_list + sorted_list def save_csv(file: str, sorted_fields: List[Field], version: str) -> None: open_mode: str = "wb" if sys.version_info >= (3, 0): open_mode: str = "w" with open(file, open_mode) as csvfile: schema_writer: _csv._writer = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_MINIMAL, lineterminator='\n') schema_writer.writerow(["ECS_Version", "Indexed", "Field_Set", "Field", "Type", "Level", "Normalization", "Example", "Description"]) for field in sorted_fields: key_parts: List[str] = field['flat_name'].split('.') if len(key_parts) == 1: field_set: str = 'base' else: field_set: str = key_parts[0] indexed: str = str(field.get('index', True)).lower() schema_writer.writerow([ version, indexed, field_set, field['flat_name'], field['type'], field['level'], ', '.join(field['normalize']), field.get('example', ''), field['short'], ]) if 'multi_fields' in field: for mf in field['multi_fields']: schema_writer.writerow([ version, indexed, field_set, mf['flat_name'], mf['type'], field['level'], '', field.get('example', ''), field['short'], ])