def ingest()

in parquet_flask/v1/ingest_aws_json.py [0:0]


    def ingest(self):
        """
        - download s3 file
        - unzip if needed
        - ingest to parquet
        - update to metadata tbl
        - delete local file
        - tag s3 object

        :return: tuple - (json object, return code)
        """
        try:
            LOGGER.debug(f'starting to ingest: {self.__props.s3_url}')
            existing_record = self.__db_io.get_by_s3_url(self.__props.s3_url)
            if existing_record is None and self.__props.is_replacing is True:
                LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
                return {'message': 'unable to replace file as it is new'}, 500

            if existing_record is not None and self.__props.is_replacing is False:
                LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ingested record: {existing_record}')
                return {'message': 'unable to ingest file as it is already ingested'}, 500

            s3 = AwsS3().set_s3_url(self.__props.s3_url)
            LOGGER.debug(f'downloading s3 file: {self.__props.uuid}')
            FileUtils.mk_dir_p(self.__props.working_dir)
            self.__saved_file_name = s3.download(self.__props.working_dir)
            self.__file_sha512 = FileUtils.get_checksum(self.__saved_file_name)
            if self.__saved_file_name.lower().endswith('.gz'):
                LOGGER.debug(f's3 file is in gzipped form. unzipping. {self.__saved_file_name}')
                self.__saved_file_name = FileUtils.gunzip_file_os(self.__saved_file_name)
            self.__compare_sha512(self.__get_s3_sha512())
            if self.__props.wait_till_complete is True:
                return self.__execute_ingest_data()
            else:
                bg_process = Process(target=self.__execute_ingest_data, args=())
                bg_process.daemon = True
                bg_process.start()
                return {'message': 'ingesting. Not waiting.', 'job_id': self.__props.uuid}, 204
        except Exception as e:
            LOGGER.exception(f'deleting error file')
            FileUtils.del_file(self.__saved_file_name)
            return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500