in parquet_flask/parquet_stat_extractor/statistics_retriever.py [0:0]
def start(self):
stats = self.__input_dataset.select(pyspark_functions.min(CDMSConstants.lat_col),
pyspark_functions.max(CDMSConstants.lat_col),
pyspark_functions.min(CDMSConstants.lon_col),
pyspark_functions.max(CDMSConstants.lon_col),
pyspark_functions.min(CDMSConstants.depth_col),
pyspark_functions.max(CDMSConstants.depth_col),
pyspark_functions.min(CDMSConstants.time_obj_col),
pyspark_functions.max(CDMSConstants.time_obj_col)).collect()
if len(stats) != 1:
raise ValueError(f'invalid row count on stats function: {stats}')
stats = stats[0].asDict()
self.min_lat = stats[f'min({CDMSConstants.lat_col})']
self.max_lat = stats[f'max({CDMSConstants.lat_col})']
self.min_lon = stats[f'min({CDMSConstants.lon_col})']
self.max_lon = stats[f'max({CDMSConstants.lon_col})']
self.min_depth = stats[f'min({CDMSConstants.depth_col})']
self.max_depth = stats[f'max({CDMSConstants.depth_col})']
self.min_datetime = stats[f'min({CDMSConstants.time_obj_col})'].timestamp()
self.max_datetime = stats[f'max({CDMSConstants.time_obj_col})'].timestamp()
if self.min_depth - CDMSConstants.missing_depth_value == 0:
self.__get_min_depth_exclude_missing_val()
self.__observation_count = {}
for each_obs_key in self.__observation_keys:
try:
obs_count = self.__input_dataset.where(self.__input_dataset[each_obs_key].isNotNull()).count()
except Exception as e:
LOGGER.exception(f'error while getting total for key: {each_obs_key}')
obs_count = 0
self.__observation_count[each_obs_key] = obs_count
# self.__observation_count = {each_obs_key: self.__input_dataset.where(self.__input_dataset[each_obs_key].isNotNull()).count() for each_obs_key in self.__observation_keys}
return self