def start()

in parquet_flask/parquet_stat_extractor/statistics_retriever_wrapper.py [0:0]


    def start(self, parquet_path):
        from parquet_flask.io_logic.retrieve_spark_session import RetrieveSparkSession
        spark: SparkSession = RetrieveSparkSession().retrieve_spark_session(self.__app_name, self.__master_spark)
        full_parquet_path = f"{self.__parquet_name}/{parquet_path}"
        LOGGER.debug(f'searching for full_parquet_path: {full_parquet_path}')
        try:
            insitu_schema = FileUtils.read_json(Config().get_value(Config.in_situ_schema))
            cdms_spark_struct = CdmsSchema().get_schema_from_json(insitu_schema)
            read_df: DataFrame = spark.read.schema(cdms_spark_struct).parquet(full_parquet_path)
        except AnalysisException as analysis_exception:
            if analysis_exception.desc is not None and analysis_exception.desc.startswith('Path does not exist'):
                LOGGER.debug(f'no such full_parquet_path: {full_parquet_path}')
                return None
            LOGGER.exception(f'error while retrieving full_parquet_path: {full_parquet_path}')
            raise analysis_exception
        stats = StatisticsRetriever(read_df, CdmsSchema().get_observation_names(insitu_schema)).start()
        return stats.to_json()