in csv-to-neptune-bulk-format/csv_converter.py [0:0]
def convert_to_csv(self):
for index1, conf_def in enumerate(self.conf_defs) :
data_file_names = conf_def.file_names
# initialize writers
conf_def.init_writers()
for index2, data_file_name in enumerate(data_file_names):
try:
if self.use_s3 :
data_file_name = conf_def.download_source_file(self.s3, data_file_name)
else :
data_file_name = conf_def.source_folder + '/' + data_file_name
logger.info(f'Processing Data File:{index2}:{data_file_name}')
with open(data_file_name, newline='', encoding=self.local_enc) as csv_file:
reader = csv.DictReader(csv_file, escapechar="\\")
#process the file
try:
conf_def.process_csv_to_csv(reader)
except Exception as ex:
raise Exception(f'Unable to process the CSV file: {data_file_name} \nexception: {str(ex)}')
#close the file
csv_file.close()
except Exception as ex:
logger.error(f'Unable to load the CSV file: {data_file_name} \nexception: {str(ex)}')
# close files
conf_def.close_writers()
# delete current files and upload new files
if self.use_s3 :
if index1 == 0 : conf_def.delete_data_files(self.s3)
conf_def.upload_data_files(self.s3)
if self.gen_dup_file: data_config.BaseDef.write_dup_files()
#log stats
data_config.BaseDef.log_stats()
data_config.BaseDef.clean_stats()