def _process_schema()

in tools/bqtag/bqtag/bqtag.py [0:0]


    def _process_schema(self, table_schema: str, table_tag_map: dict) -> None:
        """
        Process the untagged schema and convert it into tagged one by.
        adding tags to relevant columns using table_tag_map

        :param table_schema: Untagged schema of the table
        :param table_tag_map: Mapping of tags to columns
        :return: None
        """
        json_schema = json.loads(table_schema)

        # Check if rable_tag_map is empty
        if not table_tag_map:
            return json_schema

        default_column_tag = ""

        for tag_column, tag in table_tag_map.items():

            # If tag column is default_column_tag then skip the loop
            if tag_column == "default_column_tag":
                default_column_tag = tag
                continue

            tag_columns = tag_column.split(".")

            # If tag does not exist then continue
            if tag.lower() not in self.policy_tags:
                continue

            cur_level = json_schema
            cur_column = {}

            # Will remain true if all columns in column depth are found
            found_all = True

            column_number = 0

            # Loop over all the depths of column
            for tag_column2 in tag_columns:

                column_number += 1
                found = False

                # Loop over levels of schema to get to teight depth
                for column in cur_level:

                    if column["name"] == tag_column2:
                        found = True
                        if "fields" in column:
                            cur_level = column["fields"]
                        else:
                            if column_number != len(tag_columns):
                                found = False
                        cur_column = column
                        break

                if not found:
                    found_all = False
                    break

            if found_all:
                if cur_column["type"].upper() != "RECORD":
                    if "policyTags" not in cur_column:
                        cur_column["policyTags"] = {"names": []}
                    cur_column["policyTags"]["names"].append(
                        self.policy_tags[tag.lower()]
                    )

        # Add default column tag if present
        if default_column_tag.strip() != "":
            default_column_tag = default_column_tag.strip().lower()
            if default_column_tag in self.policy_tags:
                for column in json_schema:
                    self._process_default_tag(
                        column=column, tag=self.policy_tags[default_column_tag]
                    )

        return json_schema