parquet_flask/io_logic/replace_file.py (32 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from parquet_flask.io_logic.cdms_constants import CDMSConstants
from parquet_flask.io_logic.ingest_new_file import IngestNewJsonFile
from parquet_flask.io_logic.retrieve_spark_session import RetrieveSparkSession
from parquet_flask.io_logic.sanitize_record import SanitizeRecord
from parquet_flask.utils.config import Config
from parquet_flask.utils.file_utils import FileUtils
from parquet_flask.utils.time_utils import TimeUtils
LOGGER = logging.getLogger(__name__)
class ReplaceJsonFile:
def __init__(self):
self.__sss = RetrieveSparkSession()
config = Config()
self.__app_name = config.get_spark_app_name()
self.__master_spark = config.get_value('master_spark_url')
self.__mode = 'overwrite'
self.__parquet_name = config.get_value('parquet_file_name')
def ingest(self, abs_file_path, job_id):
"""
This method will assume that incoming file has data with in_situ_schema file.
So, it will definitely has `time`, `project`, and `provider`.
:param abs_file_path:
:param job_id:
:return:
"""
if not FileUtils.file_exist(abs_file_path):
raise ValueError('missing file to ingest it. path: {}'.format(abs_file_path))
LOGGER.debug(f'sanitizing the files')
input_json = SanitizeRecord(Config().get_value('in_situ_schema')).start(abs_file_path)
spark_session = self.__sss.retrieve_spark_session(self.__app_name, self.__master_spark)
spark_session.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df_writer = IngestNewJsonFile.create_df(spark_session,
input_json[CDMSConstants.observations_key],
job_id,
input_json[CDMSConstants.provider_col],
input_json[CDMSConstants.project_col])
df_writer.mode(self.__mode).parquet(self.__parquet_name, compression='GZIP')
LOGGER.debug(f'finished writing parquet')
return len(input_json[CDMSConstants.observations_key])