in parquet_flask/v1/ingest_aws_json.py [0:0]
def ingest(self):
"""
- download s3 file
- unzip if needed
- ingest to parquet
- update to metadata tbl
- delete local file
- tag s3 object
:return: tuple - (json object, return code)
"""
try:
LOGGER.debug(f'starting to ingest: {self.__props.s3_url}')
existing_record = self.__db_io.get_by_s3_url(self.__props.s3_url)
if existing_record is None and self.__props.is_replacing is True:
LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
return {'message': 'unable to replace file as it is new'}, 500
if existing_record is not None and self.__props.is_replacing is False:
LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ingested record: {existing_record}')
return {'message': 'unable to ingest file as it is already ingested'}, 500
s3 = AwsS3().set_s3_url(self.__props.s3_url)
LOGGER.debug(f'downloading s3 file: {self.__props.uuid}')
FileUtils.mk_dir_p(self.__props.working_dir)
self.__saved_file_name = s3.download(self.__props.working_dir)
self.__file_sha512 = FileUtils.get_checksum(self.__saved_file_name)
if self.__saved_file_name.lower().endswith('.gz'):
LOGGER.debug(f's3 file is in gzipped form. unzipping. {self.__saved_file_name}')
self.__saved_file_name = FileUtils.gunzip_file_os(self.__saved_file_name)
self.__compare_sha512(self.__get_s3_sha512())
if self.__props.wait_till_complete is True:
return self.__execute_ingest_data()
else:
bg_process = Process(target=self.__execute_ingest_data, args=())
bg_process.daemon = True
bg_process.start()
return {'message': 'ingesting. Not waiting.', 'job_id': self.__props.uuid}, 204
except Exception as e:
LOGGER.exception(f'deleting error file')
FileUtils.del_file(self.__saved_file_name)
return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500