in parquet_flask/io_logic/raw_query.py [0:0]
def search(self, conditions: str, spark_session=None):
"""
:param conditions: str - SQL conditions string
:param spark_session:
:return:
"""
# LOGGER.debug(f'self.__sql_query(spark_session): {self.__sql_query(spark_session)}')
query_begin_time = datetime.now()
LOGGER.debug(f'query begins at {query_begin_time}')
spark = self.__retrieve_spark() if spark_session is None else spark_session
created_spark_session_time = datetime.now()
LOGGER.debug(f'spark session created at {created_spark_session_time}. duration: {created_spark_session_time - query_begin_time}')
read_df: DataFrame = spark.read.parquet(self.__parquet_name)
read_df_time = datetime.now()
LOGGER.debug(f'parquet read created at {read_df_time}. duration: {read_df_time - created_spark_session_time}')
query_result = read_df.where(conditions)
query_result = query_result.coalesce(1)
query_time = datetime.now()
LOGGER.debug(f'parquet read filtered at {query_time}. duration: {query_time - read_df_time}')
LOGGER.debug(f'total duration: {query_time - query_begin_time}')
total_result = int(query_result.coalesce(1).count())
# total_result = 1000 # faking this for now. TODO revert it.
LOGGER.debug(f'total calc count duration: {datetime.now() - query_time}')
if self.__props.size < 1:
LOGGER.debug(f'returning only the size: {total_result}')
return {
'total': total_result,
'results': [],
}
query_time = datetime.now()
# result = query_result.withColumn('_id', F.monotonically_increasing_id())
removing_cols = [CDMSConstants.time_obj_col, CDMSConstants.year_col, CDMSConstants.month_col]
if len(self.__props.columns) > 0:
result = query_result.select(self.__props.columns)
LOGGER.debug(f'returning size : {total_result}')
result = query_result.limit(self.__props.start_at + self.__props.size).drop(*removing_cols).tail(self.__props.size)
query_result.unpersist()
LOGGER.debug(f'total retrieval duration: {datetime.now() - query_time}')
# spark.stop()
return {
'total': total_result,
'results': [k.asDict() for k in result],
}