libbeat/scripts/generate_fields_docs.py (137 lines of code) (raw):

import argparse from collections import OrderedDict from functools import lru_cache import os import re import requests import yaml def document_fields(output, section, sections, path): if "skipdocs" in section: return if "anchor" in section: output.write("[[exported-fields-{}]]\n".format(section["anchor"])) if "prefix" in section: output.write("{}\n".format(section["prefix"])) # Intermediate level titles if ("description" in section and "prefix" not in section and "anchor" not in section): output.write("[float]\n") if "description" in section: if "anchor" in section and section["name"] == "ECS": output.write("== {} fields\n\n".format(section["name"])) output.write(""" This section defines Elastic Common Schema (ECS) fields—a common set of fields to be used when storing event data in {es}. This is an exhaustive list, and fields listed here are not necessarily used by {beatname_uc}. The goal of ECS is to enable and encourage users of {es} to normalize their event data, so that they can better analyze, visualize, and correlate the data represented in their events. See the {ecs-ref}[ECS reference] for more information. """) elif "anchor" in section: output.write("== {} fields\n\n".format(section["name"])) output.write("{}\n\n".format(section["description"])) else: output.write("=== {}\n\n".format(section["name"])) output.write("{}\n\n".format(section["description"])) if "fields" not in section or not section["fields"]: return output.write("\n") for field in section["fields"]: # Skip entries which do not define a name if "name" not in field: continue if path == "": newpath = field["name"] else: newpath = path + "." + field["name"] if "type" in field and field["type"] == "group": document_fields(output, field, sections, newpath) else: document_field(output, field, newpath) def document_field(output, field, field_path): if "field_path" not in field: field["field_path"] = field_path output.write("*`{}`*::\n+\n--\n".format(field["field_path"])) if "deprecated" in field: output.write("\ndeprecated:[{}]\n\n".format(field["deprecated"])) if "description" in field: output.write("{}\n\n".format(field["description"])) if "type" in field: output.write("type: {}\n\n".format(field["type"])) if "example" in field: output.write("example: {}\n\n".format(field["example"])) if "format" in field: output.write("format: {}\n\n".format(field["format"])) if "required" in field: output.write("required: {}\n\n".format(field["required"])) if "path" in field: output.write("alias to: {}\n\n".format(field["path"])) # For Apm-Server docs only # Assign an ECS badge for ECS fields if beat_title == "Apm-Server": if field_path in ecs_fields(): output.write("{yes-icon} {ecs-ref}[ECS] field.\n\n") if "index" in field: if not field["index"]: output.write("{}\n\n".format("Field is not indexed.")) if "enabled" in field: if not field["enabled"]: output.write("{}\n\n".format("Object is not enabled.")) output.write("--\n\n") if "multi_fields" in field: for subfield in field["multi_fields"]: document_field(output, subfield, field_path + "." + subfield["name"]) @lru_cache(maxsize=None) def ecs_fields(): """ Fetch flattened ECS fields based on ECS 1.6.0 spec The result of this function is cached """ url = "https://raw.githubusercontent.com/elastic/ecs/v1.6.0/generated/ecs/ecs_flat.yml" resp = requests.get(url) if resp.status_code != 200: raise ValueError(resp.content) return yaml.load(resp.content, Loader=yaml.FullLoader) def fields_to_asciidoc(input, output, beat): dict = {'beat': beat} output.write(""" //// This file is generated! See _meta/fields.yml and scripts/generate_fields_docs.py //// :edit_url: [[exported-fields]] = Exported fields [partintro] -- This document describes the fields that are exported by {beat}. They are grouped in the following categories: """.format(**dict)) docs = yaml.load(input, Loader=yaml.FullLoader) # fields file is empty if docs is None: print("fields.yml file is empty. fields.asciidoc cannot be generated.") return # deduplicate fields, last one wins for section in docs: if not section.get("fields"): continue fields = OrderedDict() for field in section["fields"]: name = field["name"] if name in fields: assert field["type"] == fields[name]["type"], 'field "{}" redefined with different type "{}"'.format( name, field["type"]) fields[name].update(field) else: fields[name] = field section["fields"] = list(fields.values()) # Create sections from available fields sections = {} for v in docs: sections[v["key"]] = v["title"] for section in sorted(docs, key=lambda field: field["key"]): if "anchor" not in section: section["anchor"] = section["key"] if "skipdocs" not in section: output.write("* <<exported-fields-{}>>\n".format(section["anchor"])) output.write("\n--\n") # Sort alphabetically by key for section in sorted(docs, key=lambda field: field["key"]): section["name"] = section["title"] if "anchor" not in section: section["anchor"] = section["key"] document_fields(output, section, sections, "") output.write(":edit_url!:") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Generates the documentation for a Beat.") parser.add_argument("fields", help="Path to fields.yml") parser.add_argument("beattitle", help="The beat title") parser.add_argument("es_beats", help="The path to the general beats folder") parser.add_argument("--output_path", default="", dest="output_path", help="Output path, if different from path") args = parser.parse_args() fields_yml = args.fields beat_title = args.beattitle.title() es_beats = args.es_beats # Read fields.yml with open(fields_yml, encoding='utf-8') as f: fields = f.read() output = open(os.path.join(args.output_path, "docs/fields.asciidoc"), 'w', encoding='utf-8') try: fields_to_asciidoc(fields, output, beat_title) finally: output.close()