scripts/generators/otel.py (248 lines of code) (raw):
import git
import os
import shutil
from typing import (
Dict,
List,
)
import yaml
from schema import visitor
from generators import ecs_helpers
from _types import (
OTelModelFile,
OTelMapping,
OTelAttribute,
FieldEntry,
FieldNestedEntry,
Field,
OTelMappingSummary,
OTelGroup,
)
OTEL_SEMCONV_GIT = "https://github.com/open-telemetry/semantic-conventions.git"
LOCAL_TARGET_DIR_OTEL_SEMCONV = "./build/otel-semconv/"
def get_model_files(
git_repo: str,
semconv_version: str,
) -> List[OTelModelFile]:
"""Loads OpenTelemetry Semantic Conventions model from GitHub"""
target_dir = "model"
tree: git.objects.tree.Tree = get_tree_by_url(git_repo, semconv_version)
if ecs_helpers.path_exists_in_git_tree(tree, target_dir):
return collectOTelModelFiles(tree[target_dir])
else:
raise KeyError(f"Target directory './{target_dir}' not present in git '{git_repo}'!")
def get_attributes(
model_files: List[OTelModelFile]
) -> Dict[str, OTelAttribute]:
"""Retrieves (non-deprecated) OTel attributes from the model files"""
attributes: Dict[str, OTelAttribute] = {}
for model_file in model_files:
for group in model_file['groups']:
if 'type' in group and group['type'] == 'attribute_group' and 'deprecated' not in group and 'attributes' in group:
for attribute in group['attributes']:
if 'id' in attribute and 'deprecated' not in attribute:
if 'prefix' in group:
attribute['id'] = group['prefix'] + "." + attribute['id']
attributes[attribute['id']] = attribute
if 'display_name' in group:
attribute['group_display_name'] = group['display_name']
return attributes
def get_metrics(
model_files: List[OTelModelFile]
) -> Dict[str, OTelAttribute]:
"""Retrieves (non-deprecated) OTel metrics from the model files"""
metrics: Dict[str, OTelGroup] = {}
for model_file in model_files:
for group in model_file['groups']:
if 'type' in group and group['type'] == 'metric' and 'metric_name' in group and 'deprecated' not in group:
metrics[group['metric_name']] = group
return metrics
def collectOTelModelFiles(
tree: git.objects.tree.Tree,
level=0
) -> List[OTelModelFile]:
otel_model_files: List[OTelModelFile] = []
for entry in tree:
if entry.type == "tree":
otel_model_files.extend(collectOTelModelFiles(entry, level + 1))
elif entry.type == "blob" and (entry.name.endswith('.yml') or entry.name.endswith('.yaml')):
content: str = entry.data_stream.read().decode('utf-8')
model_file: OTelModelFile = yaml.safe_load(content)
otel_model_files.append(model_file)
return otel_model_files
def get_tree_by_url(
url: str,
git_ref: str,
) -> git.objects.tree.Tree:
repo: git.repo.base.Repo
clone_from_remote = False
if os.path.exists(LOCAL_TARGET_DIR_OTEL_SEMCONV):
repo = git.Repo(LOCAL_TARGET_DIR_OTEL_SEMCONV)
if not git_ref in repo.tags and not git_ref in repo.branches:
shutil.rmtree(LOCAL_TARGET_DIR_OTEL_SEMCONV)
clone_from_remote = True
else:
clone_from_remote = True
if clone_from_remote:
print(f'Loading OpenTelemetry Semantic Conventions version "{git_ref}"')
repo = git.Repo.clone_from(url, LOCAL_TARGET_DIR_OTEL_SEMCONV)
repo.git.checkout(git_ref)
return repo.head.commit.tree
def get_otel_attribute_name(
field: Field,
otel: OTelMapping
) -> str:
if otel['relation'] == 'match':
return field['flat_name']
elif 'attribute' in otel:
return otel['attribute']
elif 'metric' in otel:
raise KeyError("Passed OTel mapping is of type 'metric', expected 'attribute' here!")
else:
raise KeyError(
f"On field '{field['flat_name']}': Cannot retrieve attribute name for an OTel mapping with relation '{otel['relation']}'!")
def must_have(ecs_field_name, otel, relation_type, property):
if property not in otel:
raise ValueError(
f"On field '{ecs_field_name}': An OTel mapping with relation type '{relation_type}' must specify the property '{property}'!")
def must_not_have(ecs_field_name, otel, relation_type, property):
if property in otel:
raise ValueError(
f"On field '{ecs_field_name}': An OTel mapping with relation type '{relation_type}' must not have the property '{property}'!")
class OTelGenerator:
def __init__(self, semconv_version: str):
model_files = get_model_files(OTEL_SEMCONV_GIT, semconv_version)
self.attributes: Dict[str, OTelAttribute] = get_attributes(model_files)
self.otel_attribute_names = list(self.attributes.keys())
self.metrics: Dict[str, OTelGroup] = get_metrics(model_files)
self.otel_metric_names = list(self.metrics.keys())
self.semconv_version = semconv_version
def __set_stability(self, details):
field_details = details['field_details']
if 'flat_name' in field_details and 'otel' in field_details:
for otel in field_details['otel']:
if otel['relation'] == 'metric':
otel['stability'] = self.metrics[otel['metric']]['stability']
elif otel['relation'] == 'match' or 'attribute' in otel:
otel['stability'] = self.attributes[get_otel_attribute_name(field_details, otel)]['stability']
def __check_metric_name(self, field_name, metric_name):
if not metric_name in self.otel_metric_names:
raise ValueError(
f"On field '{field_name}': Metric '{metric_name}' does not exist in Semantic Conventions version {self.semconv_version}!")
def __check_attribute_name(self, field_details, otel):
otel_attr_name = get_otel_attribute_name(field_details, otel)
if not otel_attr_name in self.otel_attribute_names:
raise ValueError(
f"On field '{field_details['flat_name']}': Attribute '{otel_attr_name}' does not exist in Semantic Conventions version {self.semconv_version}!")
def __check_mapping(self, details):
field_details = details['field_details']
if 'flat_name' in field_details and (not 'intermediate' in field_details or not field_details['intermediate']):
ecs_field_name = field_details['flat_name']
if 'otel' in field_details:
for otel in field_details['otel']:
if not 'relation' in otel:
raise ValueError(
f"On field '{field_details['flat_name']}': OTel mapping must specify the 'relation' property!")
if otel['relation'] == 'metric':
must_have(ecs_field_name, otel, otel['relation'], 'metric')
must_not_have(ecs_field_name, otel, otel['relation'], 'attribute')
must_not_have(ecs_field_name, otel, otel['relation'], 'otlp_field')
must_not_have(ecs_field_name, otel, otel['relation'], 'stability')
self.__check_metric_name(ecs_field_name, otel['metric'])
elif otel['relation'] == 'otlp':
must_have(ecs_field_name, otel, otel['relation'], 'otlp_field')
must_have(ecs_field_name, otel, otel['relation'], 'stability')
must_not_have(ecs_field_name, otel, otel['relation'], 'attribute')
must_not_have(ecs_field_name, otel, otel['relation'], 'metric')
elif otel['relation'] == 'na':
must_not_have(ecs_field_name, otel, otel['relation'], 'otlp_field')
must_not_have(ecs_field_name, otel, otel['relation'], 'attribute')
must_not_have(ecs_field_name, otel, otel['relation'], 'metric')
must_not_have(ecs_field_name, otel, otel['relation'], 'stability')
elif otel['relation'] == 'match':
must_not_have(ecs_field_name, otel, otel['relation'], 'otlp_field')
must_not_have(ecs_field_name, otel, otel['relation'], 'attribute')
must_not_have(ecs_field_name, otel, otel['relation'], 'metric')
must_not_have(ecs_field_name, otel, otel['relation'], 'stability')
elif otel['relation'] == 'equivalent' or otel['relation'] == 'related' or otel['relation'] == 'conflict':
must_have(ecs_field_name, otel, otel['relation'], 'attribute')
must_not_have(ecs_field_name, otel, otel['relation'], 'otlp_field')
must_not_have(ecs_field_name, otel, otel['relation'], 'metric')
must_not_have(ecs_field_name, otel, otel['relation'], 'stability')
self.__check_attribute_name(field_details, otel)
else:
raise ValueError(
f"On field '{field_details['flat_name']}': Invalid relation type '{otel['relation']}'")
elif ecs_field_name in self.otel_attribute_names:
print(
f'WARNING: Field "{ecs_field_name}" exists in OTel Semantic Conventions with exactly the same name but is not mapped in ECS!')
def validate_otel_mapping(
self,
field_entries: Dict[str, FieldEntry]
) -> None:
visitor.visit_fields(field_entries, None, self.__check_mapping)
visitor.visit_fields(field_entries, None, self.__set_stability)
def get_mapping_summaries(
self,
fieldsets: List[FieldNestedEntry],
) -> List[OTelMappingSummary]:
summaries: List[OTelMappingSummary] = []
otel_namespaces = set([attr.split('.')[0] for attr in self.attributes.keys()])
for fieldset in fieldsets:
summary: OTelMappingSummary = {}
summary['namespace'] = fieldset['name']
if fieldset['name'] in otel_namespaces:
otel_namespaces.remove(fieldset['name'])
summary['title'] = fieldset['title']
summary['nr_all_ecs_fields'] = 0
summary['nr_plain_ecs_fields'] = 0
summary['nr_otel_fields'] = 0
summary['nr_matching_fields'] = 0
summary['nr_equivalent_fields'] = 0
summary['nr_related_fields'] = 0
summary['nr_metric_fields'] = 0
summary['nr_conflicting_fields'] = 0
summary['nr_not_applicable_fields'] = 0
summary['nr_otlp_fields'] = 0
for field in fieldset['fields'].values():
summary['nr_all_ecs_fields'] += 1
if 'original_fieldset' not in field:
summary['nr_plain_ecs_fields'] += 1
if 'otel' in field:
for otel in field['otel']:
if otel['relation'] == "match":
summary['nr_matching_fields'] += 1
elif otel['relation'] == "equivalent":
summary['nr_equivalent_fields'] += 1
elif otel['relation'] == "related":
summary['nr_related_fields'] += 1
elif otel['relation'] == "metric":
summary['nr_metric_fields'] += 1
elif otel['relation'] == "conflict":
summary['nr_conflicting_fields'] += 1
elif otel['relation'] == "na":
summary['nr_not_applicable_fields'] += 1
elif otel['relation'] == "otlp":
summary['nr_otlp_fields'] += 1
summary['nr_otel_fields'] += len([attr for attr in list(self.attributes.keys())
if attr.startswith(summary['namespace'] + ".")])
summaries.append(summary)
for otel_ns in otel_namespaces:
summary: OTelMappingSummary = {}
summary['namespace'] = otel_ns
summary['title'] = otel_ns
ex_attr = next(attr for attr in list(self.attributes.values()) if attr['id'].startswith(otel_ns + "."))
if 'group_display_name' in ex_attr:
disp_name = ex_attr['group_display_name']
if disp_name.endswith(" Attributes"):
disp_name = disp_name[:-11]
summary['title'] = disp_name
summary['nr_otel_fields'] = len([attr for attr in list(
self.attributes.keys()) if attr.startswith(otel_ns + ".")])
summary['nr_all_ecs_fields'] = 0
summary['nr_plain_ecs_fields'] = 0
summary['nr_matching_fields'] = 0
summary['nr_equivalent_fields'] = 0
summary['nr_related_fields'] = 0
summary['nr_conflicting_fields'] = 0
summary['nr_metric_fields'] = 0
summary['nr_otlp_fields'] = 0
summaries.append(summary)
return sorted(summaries, key=lambda s: s['namespace'])