def ingest()

in parquet_flask/io_logic/ingest_new_file.py [0:0]


    def ingest(self, abs_file_path, job_id):
        """
        This method will assume that incoming file has data with in_situ_schema file.

        So, it will definitely has `time`, `project`, and `provider`.

        :param abs_file_path:
        :param job_id:
        :return: int - number of records
        """
        if not FileUtils.file_exist(abs_file_path):
            raise ValueError('missing file to ingest it. path: {}'.format(abs_file_path))
        LOGGER.debug(f'sanitizing the files ? : {self.__sanitize_record}')
        if self.sanitize_record is True:
            input_json = SanitizeRecord(Config().get_value('in_situ_schema')).start(abs_file_path)
        else:
            if not FileUtils.file_exist(abs_file_path):
                raise ValueError('json file does not exist: {}'.format(abs_file_path))
            input_json = FileUtils.read_json(abs_file_path)
        for each_record in input_json[CDMSConstants.observations_key]:
            if 'depth' in each_record:
                each_record['depth'] = float(each_record['depth'])
            if 'wind_from_direction' in each_record:
                each_record['wind_from_direction'] = float(each_record['wind_from_direction'])
            if 'wind_to_direction' in each_record:
                each_record['wind_to_direction'] = float(each_record['wind_from_direction'])
        df_writer = self.create_df(
            self.__sss.retrieve_spark_session(
                self.__app_name,
                self.__master_spark),
            input_json[CDMSConstants.observations_key],
            job_id,
            input_json[CDMSConstants.provider_col],
            input_json[CDMSConstants.project_col])
        df_writer.mode(self.__mode).parquet(self.__parquet_name, compression='GZIP')  # snappy GZIP
        LOGGER.debug(f'finished writing parquet')
        return len(input_json[CDMSConstants.observations_key])