in csv-to-neptune-bulk-format/data_config.py [0:0]
def __init__(self, fileName, gen_dup_file, local_enc='utf-8'):
super().__init__()
self.conf_file_name = fileName
BaseDef.gen_dup_file = gen_dup_file
self.local_enc = local_enc
self.output_files = []
self.output_file_names = []
self.node_writers = {}
self.edge_writers = {}
try:
with open(self.conf_file_name, "r", encoding=self.local_enc) as json_file:
self.data = json.load(json_file)
json_file.close()
except Exception as ex:
raise Exception(f'Unable to load the JSON file: {self.conf_file_name} \nexception: {str(ex)}')
self.source_folder = self.data['source_folder'] if 'source_folder' in self.data else '.source'
self.data_folder = self.data['data_folder'] if 'data_folder' in self.data else '.data'
self.file_names = self.data['fileNames']
self.s3_bucket = self.data['s3_bucket'] if 's3_bucket' in self.data else ''
self.s3_conf_folder = self.data['s3_conf_folder'] if 's3_conf_folder' in self.data else ''
self.s3_source_folder = self.data['s3_source_folder'] if 's3_source_folder' in self.data else ''
self.s3_data_folder = self.data['s3_data_folder'] if 's3_data_folder' in self.data else ''
self.nodeDefs = []
for dict_node in self.data['nodes']:
self.nodeDefs.append(NodeDef(dict_node))
self.edgeDefs = []
for dict_edge in self.data['edges']:
self.edgeDefs.append(EdgeDef(dict_edge))
logger.debug(f'Loaded Configration from: {self.conf_file_name}')
#make source and data folders
try:
os.makedirs('./' + self.source_folder, exist_ok=True)
os.makedirs('./' + self.data_folder, exist_ok=True)
logger.debug(f'Making sure folders exist: {self.source_folder} {self.data_folder}')
except Exception as ex:
raise Exception(f'Unable to make folders: {self.source_folder} {self.data_folder} \nexception: {str(ex)}')