def calc()

in analysis/webservice/algorithms_spark/TimeSeriesSpark.py [0:0]


    def calc(self, request, **args):
        """

        :param request: StatsComputeOptions
        :param args: dict
        :return:
        """
        start_time = datetime.now()
        ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates = self.parse_arguments(
            request)
        metrics_record = self._create_metrics_record()

        resultsRaw = []

        for shortName in ds:

            the_time = datetime.now()
            daysinrange = self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
                                                                          bounding_polygon.bounds[3],
                                                                          bounding_polygon.bounds[0],
                                                                          bounding_polygon.bounds[2],
                                                                          shortName,
                                                                          start_seconds_from_epoch,
                                                                          end_seconds_from_epoch,
                                                                          metrics_callback=metrics_record.record_metrics)
            self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName))

            ndays = len(daysinrange)
            if ndays == 0:
                raise NoDataException(reason="No data found for selected timeframe")

            self.log.debug('Found {0} days in range'.format(ndays))
            for i, d in enumerate(daysinrange):
                self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
            spark_nparts = self._spark_nparts(nparts_requested)
            self.log.info('Using {} partitions'.format(spark_nparts))
            results, meta = spark_driver(daysinrange, bounding_polygon,
                                         shortName,
                                         self._tile_service_factory,
                                         metrics_record.record_metrics,
                                         normalize_dates,
                                         spark_nparts=spark_nparts,
                                         sc=self._sc)

            if apply_seasonal_cycle_filter:
                the_time = datetime.now()
                # get time series for _clim dataset
                shortName_clim = shortName + "_clim"
                daysinrange_clim = self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
                                                                                   bounding_polygon.bounds[3],
                                                                                   bounding_polygon.bounds[0],
                                                                                   bounding_polygon.bounds[2],
                                                                                   shortName_clim,
                                                                                   0,
                                                                                   SECONDS_IN_ONE_YEAR,
                                                                                   metrics_callback=metrics_record.record_metrics)
                if len(daysinrange_clim) == 0:
                    raise NexusProcessingException(reason="There is no climatology data present for dataset " + shortName + ".") 
                results_clim, _ = spark_driver(daysinrange_clim,
                                               bounding_polygon,
                                               shortName_clim,
                                               self._tile_service_factory,
                                               metrics_record.record_metrics,
                                               normalize_dates=False,
                                               spark_nparts=spark_nparts,
                                               sc=self._sc)
                clim_indexed_by_month = {datetime.utcfromtimestamp(result['time']).month: result for result in results_clim}
                if len(clim_indexed_by_month) < 12:
                    raise NexusProcessingException(reason="There are only " +
                                                   len(clim_indexed_by_month) + " months of climatology data for dataset " + 
                                                   shortName + ". A full year of climatology data is required for computing deseasoned timeseries.")

                for result in results:
                    month = datetime.utcfromtimestamp(result['time']).month

                    result['meanSeasonal'] = result['mean'] - clim_indexed_by_month[month]['mean']
                    result['minSeasonal'] = result['min'] - clim_indexed_by_month[month]['min']
                    result['maxSeasonal'] = result['max'] - clim_indexed_by_month[month]['max']
                self.log.info(
                    "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))

            the_time = datetime.now()
            filtering.applyAllFiltersOnField(results, 'mean', applySeasonal=False, applyLowPass=apply_low_pass_filter)
            filtering.applyAllFiltersOnField(results, 'max', applySeasonal=False, applyLowPass=apply_low_pass_filter)
            filtering.applyAllFiltersOnField(results, 'min', applySeasonal=False, applyLowPass=apply_low_pass_filter)

            if apply_seasonal_cycle_filter and apply_low_pass_filter:
                try:
                    filtering.applyFiltersOnField(results, 'meanSeasonal', applySeasonal=False, applyLowPass=True,
                                                  append="LowPass")
                    filtering.applyFiltersOnField(results, 'minSeasonal', applySeasonal=False, applyLowPass=True,
                                                  append="LowPass")
                    filtering.applyFiltersOnField(results, 'maxSeasonal', applySeasonal=False, applyLowPass=True,
                                                  append="LowPass")
                except Exception as e:
                    # If it doesn't work log the error but ignore it
                    tb = traceback.format_exc()
                    self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)

            resultsRaw.append([results, meta])
            self.log.info(
                "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))

            the_time = datetime.now()
            self._create_nc_file_time1d(np.array(results), 'ts.nc', 'mean',
                                        fill=-9999.)
            self.log.info(
                "NetCDF generation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))

        the_time = datetime.now()
        results = self._mergeResults(resultsRaw)

        if len(ds) == 2:
            try:
                stats = TimeSeriesSparkHandlerImpl.calculate_comparison_stats(results)
            except Exception:
                stats = {}
                tb = traceback.format_exc()
                self.log.warn("Error when calculating comparison stats:\n%s" % tb)
        else:
            stats = {}

        meta = []
        for singleRes in resultsRaw:
            meta.append(singleRes[1])

        res = TimeSeriesResults(results=results, meta=meta, stats=stats,
                                computeOptions=None, minLat=bounding_polygon.bounds[1],
                                maxLat=bounding_polygon.bounds[3], minLon=bounding_polygon.bounds[0],
                                maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
                                endTime=end_seconds_from_epoch)

        total_duration = (datetime.now() - start_time).total_seconds()
        metrics_record.record_metrics(actual_time=total_duration)
        metrics_record.print_metrics(logger)

        self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
        return res