in assets/large_language_models/utils/ComponentVersionUpdator.py [0:0]
def parse_asset_yamls(files: List[str], root: str) -> Component:
"""Read a yaml file and ."""
comp = {}
for file in files:
file_path = os.path.join(root, file)
if file.lower().startswith("asset.y"):
yml = parse_yaml(file_path)
comp["type"] = yml["type"]
elif file.lower().startswith("spec.y"):
yml = parse_yaml(file_path)
else:
continue
for field in target_fields:
if field in yml and (
not isinstance(yml[field], str) or r"{{" not in yml[field]
):
extractor = target_fields[field]
comp[field] = extractor(yml[field])
if field == "version":
comp["version_file"] = file_path
for field in validate_fields:
if field in yml and (
not isinstance(yml[field], str) or r"{{" not in yml[field]
):
validate_fields[field](comp["name"], yml[field])
if len(comp) == 0:
return None
if comp["type"] == "component":
jobs = comp["jobs"] if "jobs" in comp else []
if jobs is None or len(jobs) <= 0:
return Component("", comp["name"], comp["version"], comp["version_file"])
else:
return Pipeline(
"", comp["name"], comp["version"], comp["version_file"], jobs
)
if comp["type"] == "environment":
return None
raise Exception("Oops")