in geneve/utils/__init__.py [0:0]
def load_integration_schema(name, kibana_version):
from ruamel.yaml import YAML
conditions = {}
if kibana_version and str(kibana_version) != "serverless":
if str(kibana_version).endswith("-SNAPSHOT"):
kibana_version = str(kibana_version)[: -len("-SNAPSHOT")]
conditions["kibana.version"] = kibana_version
e = epr.EPR(timeout=17, tries=3)
res = e.search_package(name, **conditions)
uri = urlunparse(urlparse(e.url)._replace(path=res[0]["download"]))
def is_array(tree):
return "example" in tree and isinstance(json.loads(tree["example"]), list)
def field_schema(tree, path=()):
path = path + (tree["name"],)
if tree["type"] == "group":
try:
fields = tree["fields"]
except KeyError:
fields = tree["field"]
for tree in fields:
yield from field_schema(tree, path)
else:
schema = {"type": tree["type"]}
if is_array(tree):
schema["normalize"] = ["array"]
yield ".".join(path), schema
schema = {}
with resource(uri, cachedir=dirs.cache) as resource_dir:
for fields_yml in resource_dir.glob("**/fields.yml"):
with open(fields_yml) as f:
schema.update({field: schema for tree in YAML(typ="safe").load(f) for field, schema in field_schema(tree)})
return schema