in src/loading_manifest/common_manifest.py [0:0]
def load_schemas(schema_path, schema_ns_name=None, schema_ns_value=None):
def list_schema_files(path, a_file_list):
files = os.listdir(path)
for file in files:
full_path = os.path.join(path, file)
if os.path.isfile(full_path):
if file.endswith('.json'):
a_file_list.append(full_path)
elif os.path.isdir(full_path):
list_schema_files(full_path, a_file_list)
dict_schemas = dict()
# load all json files
file_list = []
list_schema_files(schema_path, file_list)
for schema_file in file_list:
with open(schema_file, 'r') as fp:
a_schema = json.load(fp)
if schema_ns_name is not None and len(schema_ns_name) > 0:
a_schema = replace_json_namespace(a_schema, schema_ns_name+":", schema_ns_value+":")
a_id = a_schema.get('$id')
if a_id is None:
a_id = a_schema.get('$ID')
if a_id is not None:
dict_schemas[a_id] = a_schema
# resolve latest version
dict_latest_key = dict()
dict_latest_version = dict()
for key, val in dict_schemas.items():
# strip the version at the end
key_parts = key.split('/')
key_version = None
if len(key_parts) > 1:
try:
key_version = int(key_parts[-1])
except ValueError:
pass
if key_version is not None:
key_latest_id = '/'.join(key_parts[:-1]) + '/'
previous_key_version = dict_latest_version.get(key_latest_id, None)
if previous_key_version is None or key_version > previous_key_version:
dict_latest_version[key_latest_id] = key_version
dict_latest_key[key_latest_id] = key
for latest_key, key in dict_latest_key.items():
dict_schemas[latest_key] = dict_schemas[key]
return dict_schemas