in analysis/webservice/algorithms/DailyDifferenceAverage.py [0:0]
def get_daily_difference_average_for_box(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
start_time,
end_time):
daysinrange = self._get_tile_service().find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset1,
start_time, end_time)
maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
if maxprocesses == 1:
calculator = DailyDifferenceAverageCalculator()
averagebyday = []
for dayinseconds in daysinrange:
result = calculator.calc_average_diff_on_day(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
dayinseconds)
averagebyday.append((result[0], result[1]))
else:
# Create a task to calc average difference for each day
manager = Manager()
work_queue = manager.Queue()
done_queue = manager.Queue()
for dayinseconds in daysinrange:
work_queue.put(
('calc_average_diff_on_day', min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, dayinseconds))
[work_queue.put(SENTINEL) for _ in range(0, maxprocesses)]
# Start new processes to handle the work
pool = Pool(maxprocesses)
[pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in range(0, maxprocesses)]
pool.close()
# Collect the results as [(day (in ms), average difference for that day)]
averagebyday = []
for i in range(0, len(daysinrange)):
result = done_queue.get()
if result[0] == 'error':
print(result[1], file=sys.stderr)
raise NexusProcessingException(reason="Error calculating average by day.")
rdata = result
averagebyday.append((rdata[0], rdata[1]))
pool.terminate()
manager.shutdown()
return averagebyday