in parquet_flask/io_logic/ingest_new_file.py [0:0]
def ingest(self, abs_file_path, job_id):
"""
This method will assume that incoming file has data with in_situ_schema file.
So, it will definitely has `time`, `project`, and `provider`.
:param abs_file_path:
:param job_id:
:return: int - number of records
"""
if not FileUtils.file_exist(abs_file_path):
raise ValueError('missing file to ingest it. path: {}'.format(abs_file_path))
LOGGER.debug(f'sanitizing the files ? : {self.__sanitize_record}')
if self.sanitize_record is True:
input_json = SanitizeRecord(Config().get_value('in_situ_schema')).start(abs_file_path)
else:
if not FileUtils.file_exist(abs_file_path):
raise ValueError('json file does not exist: {}'.format(abs_file_path))
input_json = FileUtils.read_json(abs_file_path)
for each_record in input_json[CDMSConstants.observations_key]:
if 'depth' in each_record:
each_record['depth'] = float(each_record['depth'])
if 'wind_from_direction' in each_record:
each_record['wind_from_direction'] = float(each_record['wind_from_direction'])
if 'wind_to_direction' in each_record:
each_record['wind_to_direction'] = float(each_record['wind_from_direction'])
df_writer = self.create_df(
self.__sss.retrieve_spark_session(
self.__app_name,
self.__master_spark),
input_json[CDMSConstants.observations_key],
job_id,
input_json[CDMSConstants.provider_col],
input_json[CDMSConstants.project_col])
df_writer.mode(self.__mode).parquet(self.__parquet_name, compression='GZIP') # snappy GZIP
LOGGER.debug(f'finished writing parquet')
return len(input_json[CDMSConstants.observations_key])