in analysis/webservice/algorithms/TimeSeriesSolr.py [0:0]
def getTimeSeriesStatsForBoxSingleDataSet(self, min_lat, max_lat, min_lon, max_lon, ds, start_time=0, end_time=-1,
applySeasonalFilter=True, applyLowPass=True):
daysinrange = self._get_tile_service().find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
end_time)
if len(daysinrange) == 0:
raise NoDataException(reason="No data found for selected timeframe")
maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
results = []
if maxprocesses == 1:
calculator = TimeSeriesCalculator()
for dayinseconds in daysinrange:
result = calculator.calc_average_on_day(min_lat, max_lat, min_lon, max_lon, ds, dayinseconds)
results.append(result)
else:
# Create a task to calc average difference for each day
manager = Manager()
work_queue = manager.Queue()
done_queue = manager.Queue()
for dayinseconds in daysinrange:
work_queue.put(
('calc_average_on_day', min_lat, max_lat, min_lon, max_lon, ds, dayinseconds))
[work_queue.put(SENTINEL) for _ in range(0, maxprocesses)]
# Start new processes to handle the work
pool = Pool(maxprocesses)
[pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in range(0, maxprocesses)]
pool.close()
# Collect the results as [(day (in ms), average difference for that day)]
for i in range(0, len(daysinrange)):
result = done_queue.get()
try:
error_str = result['error']
logger.error(error_str)
raise NexusProcessingException(reason="Error calculating average by day.")
except KeyError:
pass
results.append(result)
pool.terminate()
manager.shutdown()
results = sorted(results, key=lambda entry: entry["time"])
filt.applyAllFiltersOnField(results, 'mean', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
filt.applyAllFiltersOnField(results, 'max', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
filt.applyAllFiltersOnField(results, 'min', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
return results, {}