def __init__()

in CommonLayerCode/datalake-library/python/datalake_library/octagon/metadata.py [0:0]


    def __init__(self, metadata_filename):
        self.logger = logging.getLogger(__name__)

        if not os.path.isfile(metadata_filename):
            self.logger.error(f"Octagon metadata file is not found {metadata_filename}")
            raise ValueError("Metadata file is not found!")

        with open(metadata_filename, "r") as f:
            meta_dict = json.load(f)

        self.table_meta = {}

        for oct_obj in meta_dict["octagon_metadata"]:
            object_name = oct_obj["octagon_object"]
            tm = TableMeta(object_name)
            self.table_meta[object_name] = tm
            self.logger.debug(f"Loading metadata for object {object_name}")

            for oct_field in oct_obj["fields_metadata"]:
                field_meta = FieldMeta(
                    attribute=oct_field["attribute"],
                    type=oct_field["type"],
                    partition_key=oct_field.get("partition_key", False),
                    sort_key=oct_field.get("sort_key", False),
                    generated=oct_field.get("generated", False),
                    mandatory=oct_field.get("mandatory", False),
                    composite=oct_field.get("composite", False),
                )
                tm.add_field_meta(field_meta)
            self.logger.debug(f"Loading metadata for object {object_name} DONE")