in analysis/webservice/algorithms/TimeSeries.py [0:0]
def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch,
end_seconds_from_epoch,
apply_seasonal_cycle_filter=True, apply_low_pass_filter=True):
the_time = datetime.now()
daysinrange = self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
bounding_polygon.bounds[3],
bounding_polygon.bounds[0],
bounding_polygon.bounds[2],
ds,
start_seconds_from_epoch,
end_seconds_from_epoch)
logger.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds))
if len(daysinrange) == 0:
raise NoDataException(reason="No data found for selected timeframe")
the_time = datetime.now()
maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
results = []
if maxprocesses == 1:
calculator = TimeSeriesCalculator()
for dayinseconds in daysinrange:
result = calculator.calc_average_on_day(bounding_polygon.wkt, ds, dayinseconds)
results += [result] if result else []
else:
# Create a task to calc average difference for each day
manager = Manager()
work_queue = manager.Queue()
done_queue = manager.Queue()
for dayinseconds in daysinrange:
work_queue.put(
('calc_average_on_day', bounding_polygon.wkt, ds, dayinseconds))
[work_queue.put(SENTINEL) for _ in range(0, maxprocesses)]
# Start new processes to handle the work
pool = Pool(maxprocesses)
[pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in range(0, maxprocesses)]
pool.close()
# Collect the results as [(day (in ms), average difference for that day)]
for i in range(0, len(daysinrange)):
result = done_queue.get()
try:
error_str = result['error']
logger.error(error_str)
raise NexusProcessingException(reason="Error calculating average by day.")
except KeyError:
pass
results += [result] if result else []
pool.terminate()
manager.shutdown()
results = sorted(results, key=lambda entry: entry["time"])
logger.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
if apply_seasonal_cycle_filter:
the_time = datetime.now()
for result in results:
month = datetime.utcfromtimestamp(result['time']).month
month_mean, month_max, month_min = self.calculate_monthly_average(month, bounding_polygon.wkt, ds)
seasonal_mean = result['mean'] - month_mean
seasonal_min = result['min'] - month_min
seasonal_max = result['max'] - month_max
result['meanSeasonal'] = seasonal_mean
result['minSeasonal'] = seasonal_min
result['maxSeasonal'] = seasonal_max
logger.info(
"Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
the_time = datetime.now()
filtering.applyAllFiltersOnField(results, 'mean', applySeasonal=False, applyLowPass=apply_low_pass_filter)
filtering.applyAllFiltersOnField(results, 'max', applySeasonal=False, applyLowPass=apply_low_pass_filter)
filtering.applyAllFiltersOnField(results, 'min', applySeasonal=False, applyLowPass=apply_low_pass_filter)
if apply_seasonal_cycle_filter and apply_low_pass_filter:
try:
filtering.applyFiltersOnField(results, 'meanSeasonal', applySeasonal=False, applyLowPass=True,
append="LowPass")
filtering.applyFiltersOnField(results, 'minSeasonal', applySeasonal=False, applyLowPass=True,
append="LowPass")
filtering.applyFiltersOnField(results, 'maxSeasonal', applySeasonal=False, applyLowPass=True,
append="LowPass")
except Exception as e:
# If it doesn't work log the error but ignore it
tb = traceback.format_exc()
logger.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
logger.info(
"LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
return results, {}