parquet_flask/v1/ingest_aws_json.py (176 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import uuid
from multiprocessing.context import Process
from parquet_flask.aws.es_abstract import ESAbstract
from parquet_flask.aws.es_factory import ESFactory
from parquet_flask.io_logic.metadata_tbl_es import MetadataTblES
from parquet_flask.aws.aws_s3 import AwsS3
from parquet_flask.io_logic.cdms_constants import CDMSConstants
from parquet_flask.io_logic.ingest_new_file import IngestNewJsonFile
from parquet_flask.io_logic.metadata_tbl_interface import MetadataTblInterface
from parquet_flask.utils.config import Config
from parquet_flask.utils.file_utils import FileUtils
from parquet_flask.utils.time_utils import TimeUtils
LOGGER = logging.getLogger(__name__)
class IngestAwsJsonProps:
def __init__(self):
self.__s3_url = None
self.__s3_sha_url = None
self.__uuid = str(uuid.uuid4())
self.__working_dir = f'/tmp/{str(uuid.uuid4())}'
self.__is_replacing = False
self.__is_sanitizing = True
self.__wait_till_complete = True
@property
def wait_till_complete(self):
return self.__wait_till_complete
@wait_till_complete.setter
def wait_till_complete(self, val):
"""
:param val:
:return: None
"""
self.__wait_till_complete = val
return
@property
def is_sanitizing(self):
return self.__is_sanitizing
@is_sanitizing.setter
def is_sanitizing(self, val):
"""
:param val:
:return: None
"""
self.__is_sanitizing = val
return
@property
def s3_sha_url(self):
return self.__s3_sha_url
@s3_sha_url.setter
def s3_sha_url(self, val):
"""
:param val:
:return: None
"""
self.__s3_sha_url = val
return
@property
def is_replacing(self):
return self.__is_replacing
@is_replacing.setter
def is_replacing(self, val):
"""
:param val:
:return: None
"""
self.__is_replacing = val
return
@property
def working_dir(self):
return self.__working_dir
@working_dir.setter
def working_dir(self, val):
"""
:param val:
:return: None
"""
self.__working_dir = val
return
@property
def s3_url(self):
return self.__s3_url
@s3_url.setter
def s3_url(self, val):
"""
:param val:
:return: None
"""
self.__s3_url = val
return
@property
def uuid(self):
return self.__uuid
@uuid.setter
def uuid(self, val):
"""
:param val:
:return: None
"""
self.__uuid = val
return
class IngestAwsJson:
def __init__(self, props=IngestAwsJsonProps()):
self.__props = props
self.__saved_file_name = None
self.__ingested_date = TimeUtils.get_current_time_unix()
self.__file_sha512 = None
self.__sha512_result = None
self.__sha512_cause = None
config = Config()
es_url = config.get_value(Config.es_url)
es_port = int(config.get_value(Config.es_port, '443'))
self.__es: ESAbstract = ESFactory().get_instance('AWS', index='', base_url=es_url, port=es_port)
self.__db_io: MetadataTblInterface = MetadataTblES(self.__es)
def __get_s3_sha512(self):
"""
sha512 file is in this format
<sha-512><space or tab><s3 json filename>
:return:
"""
if self.__props.s3_sha_url is None:
LOGGER.warning(f's3_sha_url is None. using s3_url to generate one')
self.__props.s3_sha_url = f'{self.__props.s3_url}.sha512'
s3 = AwsS3().set_s3_url(self.__props.s3_sha_url)
try:
s3.get_s3_obj_size()
sha512_content = s3.read_small_txt_file()
return sha512_content.replace(os.path.basename(self.__props.s3_url), '').strip()
except:
LOGGER.exception(f'cannot find s3_sha_url')
return None
def __compare_sha512(self, s3_sha512):
if s3_sha512 is None:
self.__sha512_result = False
self.__sha512_cause = 'missing S3 sha512'
return
if s3_sha512 == self.__file_sha512:
self.__sha512_result = True
self.__sha512_cause = ''
return
self.__sha512_result = False
self.__sha512_cause = f'mismatched sha512: {s3_sha512} vs {self.__file_sha512}'
return
def __execute_ingest_data(self):
try:
LOGGER.debug(f'ingesting file: {self.__saved_file_name}')
start_time = TimeUtils.get_current_time_unix()
ingest_new_file = IngestNewJsonFile(self.__props.is_replacing)
ingest_new_file.sanitize_record = self.__props.is_sanitizing
num_records = ingest_new_file.ingest(self.__saved_file_name, self.__props.uuid)
end_time = TimeUtils.get_current_time_unix()
LOGGER.debug(f'uploading to metadata table')
new_record = {
CDMSConstants.s3_url_key: self.__props.s3_url,
CDMSConstants.uuid_key: self.__props.uuid,
CDMSConstants.ingested_date_key: self.__ingested_date,
CDMSConstants.file_size_key: FileUtils.get_size(self.__saved_file_name),
CDMSConstants.checksum_key: self.__file_sha512,
CDMSConstants.checksum_validation: self.__sha512_result,
CDMSConstants.checksum_cause: self.__sha512_cause,
CDMSConstants.job_start_key: start_time,
CDMSConstants.job_end_key: end_time,
CDMSConstants.records_count_key: num_records,
}
if self.__props.is_replacing:
self.__db_io.replace_record(new_record)
else:
self.__db_io.insert_record(new_record)
LOGGER.debug(f'deleting used file')
FileUtils.del_file(self.__saved_file_name)
# TODO make it background process?
LOGGER.warning('Disabled tagging S3 due to IAM issues')
# LOGGER.debug(f'tagging s3')
# s3.add_tags_to_obj({
# 'parquet_ingested': TimeUtils.get_time_str(self.__ingested_date),
# 'job_id': self.__props.uuid,
# })
except Exception as e:
LOGGER.exception(f'deleting error file')
FileUtils.del_file(self.__saved_file_name)
return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500
if self.__sha512_result is True:
return {'message': 'ingested', 'job_id': self.__props.uuid}, 201
return {'message': 'ingested, different sha512', 'cause': self.__sha512_cause,
'job_id': self.__props.uuid}, 203
def ingest(self):
"""
- download s3 file
- unzip if needed
- ingest to parquet
- update to metadata tbl
- delete local file
- tag s3 object
:return: tuple - (json object, return code)
"""
try:
LOGGER.debug(f'starting to ingest: {self.__props.s3_url}')
existing_record = self.__db_io.get_by_s3_url(self.__props.s3_url)
if existing_record is None and self.__props.is_replacing is True:
LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
return {'message': 'unable to replace file as it is new'}, 500
if existing_record is not None and self.__props.is_replacing is False:
LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ingested record: {existing_record}')
return {'message': 'unable to ingest file as it is already ingested'}, 500
s3 = AwsS3().set_s3_url(self.__props.s3_url)
LOGGER.debug(f'downloading s3 file: {self.__props.uuid}')
FileUtils.mk_dir_p(self.__props.working_dir)
self.__saved_file_name = s3.download(self.__props.working_dir)
self.__file_sha512 = FileUtils.get_checksum(self.__saved_file_name)
if self.__saved_file_name.lower().endswith('.gz'):
LOGGER.debug(f's3 file is in gzipped form. unzipping. {self.__saved_file_name}')
self.__saved_file_name = FileUtils.gunzip_file_os(self.__saved_file_name)
self.__compare_sha512(self.__get_s3_sha512())
if self.__props.wait_till_complete is True:
return self.__execute_ingest_data()
else:
bg_process = Process(target=self.__execute_ingest_data, args=())
bg_process.daemon = True
bg_process.start()
return {'message': 'ingesting. Not waiting.', 'job_id': self.__props.uuid}, 204
except Exception as e:
LOGGER.exception(f'deleting error file')
FileUtils.del_file(self.__saved_file_name)
return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500