in tools/bqtag/bqtag/bqtag.py [0:0]
def _process_schema(self, table_schema: str, table_tag_map: dict) -> None:
"""
Process the untagged schema and convert it into tagged one by.
adding tags to relevant columns using table_tag_map
:param table_schema: Untagged schema of the table
:param table_tag_map: Mapping of tags to columns
:return: None
"""
json_schema = json.loads(table_schema)
# Check if rable_tag_map is empty
if not table_tag_map:
return json_schema
default_column_tag = ""
for tag_column, tag in table_tag_map.items():
# If tag column is default_column_tag then skip the loop
if tag_column == "default_column_tag":
default_column_tag = tag
continue
tag_columns = tag_column.split(".")
# If tag does not exist then continue
if tag.lower() not in self.policy_tags:
continue
cur_level = json_schema
cur_column = {}
# Will remain true if all columns in column depth are found
found_all = True
column_number = 0
# Loop over all the depths of column
for tag_column2 in tag_columns:
column_number += 1
found = False
# Loop over levels of schema to get to teight depth
for column in cur_level:
if column["name"] == tag_column2:
found = True
if "fields" in column:
cur_level = column["fields"]
else:
if column_number != len(tag_columns):
found = False
cur_column = column
break
if not found:
found_all = False
break
if found_all:
if cur_column["type"].upper() != "RECORD":
if "policyTags" not in cur_column:
cur_column["policyTags"] = {"names": []}
cur_column["policyTags"]["names"].append(
self.policy_tags[tag.lower()]
)
# Add default column tag if present
if default_column_tag.strip() != "":
default_column_tag = default_column_tag.strip().lower()
if default_column_tag in self.policy_tags:
for column in json_schema:
self._process_default_tag(
column=column, tag=self.policy_tags[default_column_tag]
)
return json_schema