in parquet_flask/v1/ingest_aws_json.py [0:0]
def __execute_ingest_data(self):
try:
LOGGER.debug(f'ingesting file: {self.__saved_file_name}')
start_time = TimeUtils.get_current_time_unix()
ingest_new_file = IngestNewJsonFile(self.__props.is_replacing)
ingest_new_file.sanitize_record = self.__props.is_sanitizing
num_records = ingest_new_file.ingest(self.__saved_file_name, self.__props.uuid)
end_time = TimeUtils.get_current_time_unix()
LOGGER.debug(f'uploading to metadata table')
new_record = {
CDMSConstants.s3_url_key: self.__props.s3_url,
CDMSConstants.uuid_key: self.__props.uuid,
CDMSConstants.ingested_date_key: self.__ingested_date,
CDMSConstants.file_size_key: FileUtils.get_size(self.__saved_file_name),
CDMSConstants.checksum_key: self.__file_sha512,
CDMSConstants.checksum_validation: self.__sha512_result,
CDMSConstants.checksum_cause: self.__sha512_cause,
CDMSConstants.job_start_key: start_time,
CDMSConstants.job_end_key: end_time,
CDMSConstants.records_count_key: num_records,
}
if self.__props.is_replacing:
self.__db_io.replace_record(new_record)
else:
self.__db_io.insert_record(new_record)
LOGGER.debug(f'deleting used file')
FileUtils.del_file(self.__saved_file_name)
# TODO make it background process?
LOGGER.warning('Disabled tagging S3 due to IAM issues')
# LOGGER.debug(f'tagging s3')
# s3.add_tags_to_obj({
# 'parquet_ingested': TimeUtils.get_time_str(self.__ingested_date),
# 'job_id': self.__props.uuid,
# })
except Exception as e:
LOGGER.exception(f'deleting error file')
FileUtils.del_file(self.__saved_file_name)
return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500
if self.__sha512_result is True:
return {'message': 'ingested', 'job_id': self.__props.uuid}, 201
return {'message': 'ingested, different sha512', 'cause': self.__sha512_cause,
'job_id': self.__props.uuid}, 203