in src/loading_manifest/common_manifest.py [0:0]
def create_loading_manifest(
work_product,
work_product_components,
files,
schema_id_lm,
schema_id_wp,
schema_id_wpc,
schema_id_file,
schema_ns_name,
schema_ns_value,
dict_schemas):
lm = dict()
lm[LoadingManifest.TAG_WORK_PRODUCT] = work_product
lm[LoadingManifest.TAG_WORK_PRODUCT_COMPONENTS] = work_product_components
lm[LoadingManifest.TAG_DATASETS] = files
lm_group = dict()
lm_group[LoadingManifest.TAG_KIND] = LoadingManifest.KIND_ID_1_0_0
lm_group[LoadingManifest.TAG_REFERENCE_DATA] = []
lm_group[LoadingManifest.TAG_MASTER_DATA] = []
lm_group[LoadingManifest.TAG_WP_DATA] = lm
if schema_ns_name is not None and len(schema_ns_name) > 0:
lm_group = replace_json_namespace(lm_group, schema_ns_name + ":", schema_ns_value + ":")
if dict_schemas is not None and len(dict_schemas) > 0:
if schema_id_lm is not None and len(schema_id_lm) > 0:
validate_schema(lm_group, schema_id_lm, dict_schemas)
if schema_id_wp is not None and len(schema_id_wp) > 0:
wp = replace_json_namespace(work_product, schema_ns_name + ":", schema_ns_value + ":")
del wp[WorkProductManifest.TAG_DATA][WorkProductManifest.TAG_DATA_COMPONENTS]
validate_schema(wp, schema_id_wp, dict_schemas)
if schema_id_wpc is not None and len(schema_id_wpc) > 0:
for wpc in work_product_components:
wpc = replace_json_namespace(wpc, schema_ns_name + ":", schema_ns_value + ":")
del wpc[WorkProductComponentManifest.TAG_ID]
wpc_data = wpc[WorkProductComponentManifest.TAG_DATA]
del wpc_data[WorkProductComponentManifest.TAG_DATA_DATASETS]
validate_schema(wpc, schema_id_wpc, dict_schemas)
if schema_id_file is not None and len(schema_id_file) > 0:
for file in files:
file = replace_json_namespace(file, schema_ns_name + ":", schema_ns_value + ":")
del file[FileManifest.TAG_ID]
validate_schema(file, schema_id_file, dict_schemas)
return lm_group