in analysis/webservice/algorithms_spark/TimeSeriesSpark.py [0:0]
def calc(self, request, **args):
"""
:param request: StatsComputeOptions
:param args: dict
:return:
"""
start_time = datetime.now()
ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates = self.parse_arguments(
request)
metrics_record = self._create_metrics_record()
resultsRaw = []
for shortName in ds:
the_time = datetime.now()
daysinrange = self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
bounding_polygon.bounds[3],
bounding_polygon.bounds[0],
bounding_polygon.bounds[2],
shortName,
start_seconds_from_epoch,
end_seconds_from_epoch,
metrics_callback=metrics_record.record_metrics)
self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
self.log.debug('Found {0} days in range'.format(ndays))
for i, d in enumerate(daysinrange):
self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))
results, meta = spark_driver(daysinrange, bounding_polygon,
shortName,
self._tile_service_factory,
metrics_record.record_metrics,
normalize_dates,
spark_nparts=spark_nparts,
sc=self._sc)
if apply_seasonal_cycle_filter:
the_time = datetime.now()
# get time series for _clim dataset
shortName_clim = shortName + "_clim"
daysinrange_clim = self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
bounding_polygon.bounds[3],
bounding_polygon.bounds[0],
bounding_polygon.bounds[2],
shortName_clim,
0,
SECONDS_IN_ONE_YEAR,
metrics_callback=metrics_record.record_metrics)
if len(daysinrange_clim) == 0:
raise NexusProcessingException(reason="There is no climatology data present for dataset " + shortName + ".")
results_clim, _ = spark_driver(daysinrange_clim,
bounding_polygon,
shortName_clim,
self._tile_service_factory,
metrics_record.record_metrics,
normalize_dates=False,
spark_nparts=spark_nparts,
sc=self._sc)
clim_indexed_by_month = {datetime.utcfromtimestamp(result['time']).month: result for result in results_clim}
if len(clim_indexed_by_month) < 12:
raise NexusProcessingException(reason="There are only " +
len(clim_indexed_by_month) + " months of climatology data for dataset " +
shortName + ". A full year of climatology data is required for computing deseasoned timeseries.")
for result in results:
month = datetime.utcfromtimestamp(result['time']).month
result['meanSeasonal'] = result['mean'] - clim_indexed_by_month[month]['mean']
result['minSeasonal'] = result['min'] - clim_indexed_by_month[month]['min']
result['maxSeasonal'] = result['max'] - clim_indexed_by_month[month]['max']
self.log.info(
"Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
the_time = datetime.now()
filtering.applyAllFiltersOnField(results, 'mean', applySeasonal=False, applyLowPass=apply_low_pass_filter)
filtering.applyAllFiltersOnField(results, 'max', applySeasonal=False, applyLowPass=apply_low_pass_filter)
filtering.applyAllFiltersOnField(results, 'min', applySeasonal=False, applyLowPass=apply_low_pass_filter)
if apply_seasonal_cycle_filter and apply_low_pass_filter:
try:
filtering.applyFiltersOnField(results, 'meanSeasonal', applySeasonal=False, applyLowPass=True,
append="LowPass")
filtering.applyFiltersOnField(results, 'minSeasonal', applySeasonal=False, applyLowPass=True,
append="LowPass")
filtering.applyFiltersOnField(results, 'maxSeasonal', applySeasonal=False, applyLowPass=True,
append="LowPass")
except Exception as e:
# If it doesn't work log the error but ignore it
tb = traceback.format_exc()
self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
resultsRaw.append([results, meta])
self.log.info(
"LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
the_time = datetime.now()
self._create_nc_file_time1d(np.array(results), 'ts.nc', 'mean',
fill=-9999.)
self.log.info(
"NetCDF generation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
the_time = datetime.now()
results = self._mergeResults(resultsRaw)
if len(ds) == 2:
try:
stats = TimeSeriesSparkHandlerImpl.calculate_comparison_stats(results)
except Exception:
stats = {}
tb = traceback.format_exc()
self.log.warn("Error when calculating comparison stats:\n%s" % tb)
else:
stats = {}
meta = []
for singleRes in resultsRaw:
meta.append(singleRes[1])
res = TimeSeriesResults(results=results, meta=meta, stats=stats,
computeOptions=None, minLat=bounding_polygon.bounds[1],
maxLat=bounding_polygon.bounds[3], minLon=bounding_polygon.bounds[0],
maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
endTime=end_seconds_from_epoch)
total_duration = (datetime.now() - start_time).total_seconds()
metrics_record.record_metrics(actual_time=total_duration)
metrics_record.print_metrics(logger)
self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
return res