def get_unioned_read_df()

in parquet_flask/io_logic/query_v4.py [0:0]


    def get_unioned_read_df(self, condition_manager: ParquetQueryConditionManagementV4, spark: SparkSession) -> DataFrame:
        cdms_spark_struct = CdmsSchema().get_schema_from_json(FileUtils.read_json(Config().get_value(Config.in_situ_schema)))
        if len(condition_manager.parquet_names) < 1:
            LOGGER.fatal(f'cannot find any in ES. returning None instead of searching entire parquet directory for now. ')
            return None
            # read_df: DataFrame = spark.read.schema(cdms_spark_struct).parquet(condition_manager.parquet_name)
            # return read_df
        read_df_list = []
        distinct_parquet_names = self.__strip_duplicates_maintain_order(condition_manager)
        for each in distinct_parquet_names:
            each: PartitionedParquetPath = each
            try:
                temp_df: DataFrame = spark.read.schema(cdms_spark_struct).parquet(each.generate_path())
                for k, v in each.get_df_columns().items():
                    temp_df: DataFrame = temp_df.withColumn(k, lit(v))
                read_df_list.append(temp_df)
            except Exception as e:
                LOGGER.exception(f'failed to retrieve data from spark for: {each.generate_path()}')
        if len(read_df_list) < 1:
            return None
        main_read_df: DataFrame = read_df_list[0]
        for each in read_df_list[1:]:
            main_read_df = main_read_df.union(each)
        return main_read_df