in parquet_flask/io_logic/query_v4.py [0:0]
def get_unioned_read_df(self, condition_manager: ParquetQueryConditionManagementV4, spark: SparkSession) -> DataFrame:
cdms_spark_struct = CdmsSchema().get_schema_from_json(FileUtils.read_json(Config().get_value(Config.in_situ_schema)))
if len(condition_manager.parquet_names) < 1:
LOGGER.fatal(f'cannot find any in ES. returning None instead of searching entire parquet directory for now. ')
return None
# read_df: DataFrame = spark.read.schema(cdms_spark_struct).parquet(condition_manager.parquet_name)
# return read_df
read_df_list = []
distinct_parquet_names = self.__strip_duplicates_maintain_order(condition_manager)
for each in distinct_parquet_names:
each: PartitionedParquetPath = each
try:
temp_df: DataFrame = spark.read.schema(cdms_spark_struct).parquet(each.generate_path())
for k, v in each.get_df_columns().items():
temp_df: DataFrame = temp_df.withColumn(k, lit(v))
read_df_list.append(temp_df)
except Exception as e:
LOGGER.exception(f'failed to retrieve data from spark for: {each.generate_path()}')
if len(read_df_list) < 1:
return None
main_read_df: DataFrame = read_df_list[0]
for each in read_df_list[1:]:
main_read_df = main_read_df.union(each)
return main_read_df