in src/loading_manifest/csv_to_json.py [0:0]
def create_manifest_from_csv(input_csv, template_json, output_path,
schema_path=None, schema_ns_name=None, schema_ns_value=None,
required_template=None, array_parent=None, object_parent=None, group_filename=None):
with open(template_json, 'r') as fp:
root_template = json.load(fp)
schema_id = root_template.get(tag_schema_id)
if schema_id is not None:
del root_template[tag_schema_id]
required_template_from_template = root_template.get(tag_required_template)
if required_template_from_template is not None:
del root_template[required_template_from_template]
if required_template is not None and len(required_template) > 0:
required_template = json.loads(required_template)
else:
required_template = required_template_from_template
output_array_parent = root_template.get(tag_array_parent)
if output_array_parent is not None:
del root_template[tag_array_parent]
if array_parent is not None and len(array_parent) > 0:
output_array_parent = array_parent
output_object_parent = root_template.get(tag_object_parent)
if output_object_parent is not None:
del root_template[tag_object_parent]
if object_parent is not None and len(object_parent) > 0:
output_object_parent = object_parent
output_kind_parent = root_template.get(tag_kind_parent)
if output_kind_parent is not None:
del root_template[tag_kind_parent]
if schema_ns_name is not None and len(schema_ns_name) > 0:
output_kind_parent = output_kind_parent.replace(schema_ns_name + ":", schema_ns_value + ":")
group_lm = None
group_lm_parent = None
if group_filename is not None and len(group_filename) > 0:
if output_array_parent is None or len(output_array_parent) == 0:
logging.warning("Array parent is needed to group generated load manifests")
if not group_filename.endswith('.json'):
group_filename = group_filename + '.json'
output_group_filename = os.path.join(output_path, group_filename)
group_lm = dict()
if output_kind_parent is not None and len(output_kind_parent) > 0:
group_lm['kind'] = output_kind_parent
parent_items = output_array_parent.split(".")
group_lm_parent = group_lm
for parent_item in parent_items[:-1]:
parent_item = parent_item.strip()
group_lm[parent_item] = dict()
group_lm_parent = group_lm_parent[parent_item]
group_lm_parent[parent_items[-1]] = []
group_lm_parent = group_lm_parent[parent_items[-1]]
# load schemas if available
do_validate = (schema_path is not None and len(schema_path) > 0 and schema_id is not None)
dict_schemas = dict()
schema = None
wp_wpc_schema = None
if do_validate:
dict_schemas = load_schemas(schema_path, schema_ns_name, schema_ns_value)
schema = dict_schemas.get(schema_id)
if schema is None:
logging.warning("No schema found for: %s", schema_id)
else:
wp_wpc_schema = schema.get('properties', {}).get('Data', None)
if wp_wpc_schema is not None and wp_wpc_schema.get('properties', {}).get('WorkProduct') is not None:
schema = wp_wpc_schema
schema['$id'] = schema_id
parameters_object = dict()
parse_template_parameters(root_template, parameters_object)
# read csv column names
map_parameter_column = map_csv_column_names_to_parameters(input_csv, parameters_object)
# output each csv row as one json file
with open(input_csv, mode='r', encoding='utf-8') as infile:
reader = csv.reader(infile)
skip_first_row = True
row_count = 0
processed = set()
processed_lower = set()
empty_row_count = 0
for rows in reader:
if skip_first_row:
skip_first_row = False
continue
row_count = row_count + 1
# skip empty row
skip_empty_row = True
for row_column in rows:
if row_column is not None and len(row_column.strip()) > 0:
skip_empty_row = False
break
if skip_empty_row:
empty_row_count = empty_row_count + 1
continue
output_file = os.path.join(output_path, os.path.basename(input_csv)[:-4]+'_'+str(row_count)+'.json')
try:
lm = create_manifest_from_row(
root_template = root_template,
required_template=required_template,
parameters_object=parameters_object,
map_parameter_column=map_parameter_column,
data_row=rows
)
if schema_ns_name is not None and len(schema_ns_name) > 0:
lm = replace_json_namespace(lm, schema_ns_name + ":", schema_ns_value + ":")
output_file_name = lm.get(tag_file_name)
output_file_name = output_file_name.replace('/', '-')
output_file_name = output_file_name.replace('\\', '-')
if output_file_name is not None:
output_file = os.path.join(output_path, output_file_name)
del lm[tag_file_name]
if output_file in processed:
logging.warning("Duplicate rows found. Row: %s %s %s", row_count, ", File name:", output_file_name)
else:
# Windows file name is not case-sensitive
duplicate_name_count = 1
while output_file.lower() in processed_lower:
name_parts = output_file.split('.')
if len(name_parts) > 1:
name_parts[-2] = name_parts[-2] + '_' + str(duplicate_name_count)
output_file = '.'.join(name_parts)
else:
output_file = output_file + '_' + str(duplicate_name_count)
duplicate_name_count = duplicate_name_count + 1
if schema is not None:
resolver = jsonschema.RefResolver("", schema, store=dict_schemas)
to_be_validated = copy.deepcopy(lm)
if lm.get('Manifest', None) is not None:
to_be_validated = lm['Manifest']
if wp_wpc_schema is not None:
# remove WorkProduct.data.Components
wp_data = (to_be_validated['WorkProduct'])['data']
#del wp_data['Components']
# remov WorkProductComponents.data.Datasets
for wpc in to_be_validated['WorkProductComponents']:
wpc_data = wpc['data']
# del wpc_data['Datasets']
jsonschema.validate(to_be_validated, schema, resolver=resolver)
if output_array_parent is not None:
if group_lm_parent is not None:
group_lm_parent.append(lm)
else:
parent_items = output_array_parent.split(".")
new_lm = dict()
if output_kind_parent is not None and len(output_kind_parent) > 0:
new_lm['kind'] = output_kind_parent
lm_parent = new_lm
for parent_item in parent_items[:-1]:
parent_item = parent_item.strip()
lm_parent[parent_item] = dict()
lm_parent = lm_parent[parent_item]
lm_parent[parent_items[-1]] = [lm]
lm = new_lm
elif output_object_parent is not None:
parent_items = output_object_parent.split(".")
new_lm = dict()
if output_kind_parent is not None and len(output_kind_parent) > 0:
new_lm['kind'] = output_kind_parent
lm_parent = new_lm
for parent_item in parent_items[:-1]:
parent_item = parent_item.strip()
lm_parent[parent_item] = dict()
lm_parent = lm_parent[parent_item]
lm_parent[parent_items[-1]] = lm
lm = new_lm
if group_lm is None:
with open(output_file, "w") as f:
json.dump(
obj=lm,
fp=f,
indent=4
)
processed.add(output_file)
processed_lower.add(output_file.lower())
except Exception:
logging.exception("Unable to process data row: {}".format(row_count))
try:
os.remove(output_file)
except Exception:
pass
if group_lm is not None:
with open(output_group_filename, "w") as f:
json.dump(
obj=group_lm,
fp=f,
indent=4
)
logging.info("Generated {} load manifests.".format(len(processed)))
if empty_row_count > 0:
logging.info("Skipped {} empty rows.".format(empty_row_count))