def calc()

in analysis/webservice/algorithms_spark/MatchupDoms.py [0:0]


    def calc(self, request, **args):
        start = datetime.utcnow()
        # TODO Assuming Satellite primary
        bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
        start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
        depth_min, depth_max, time_tolerance, radius_tolerance, \
        platforms, match_once, result_size_limit = self.parse_arguments(request)

        with ResultsStorage(self.config) as resultsStorage:

            execution_id = str(resultsStorage.insertExecution(None, start, None, None))

        self.log.debug("Querying for tiles in search domain")
        # Get tile ids in box
        tile_ids = [tile.tile_id for tile in
                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
                                                             start_seconds_from_epoch, end_seconds_from_epoch,
                                                             fetch_data=False, fl='id',
                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
                                                                   'tile_min_lat asc'], rows=5000)]

        self.log.info('Found %s tile_ids', len(tile_ids))
        # Call spark_matchup
        self.log.debug("Calling Spark Driver")
        try:
            spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
                                                secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
                                                radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
        except Exception as e:
            self.log.exception(e)
            raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)

        end = datetime.utcnow()

        self.log.debug("Building and saving results")
        args = {
            "primary": primary_ds_name,
            "matchup": secondary_ds_names,
            "startTime": start_time,
            "endTime": end_time,
            "bbox": request.get_argument('b'),
            "timeTolerance": time_tolerance,
            "radiusTolerance": float(radius_tolerance),
            "platforms": platforms,
            "parameter": parameter_s
        }

        if depth_min is not None:
            args["depthMin"] = float(depth_min)

        if depth_max is not None:
            args["depthMax"] = float(depth_max)

        total_keys = len(list(spark_result.keys()))
        total_values = sum(len(v) for v in spark_result.values())
        details = {
            "timeToComplete": int((end - start).total_seconds()),
            "numSecondaryMatched": total_values,
            "numPrimaryMatched": total_keys
        }

        matches = MatchupDoms.convert_to_matches(spark_result)

        def do_result_insert():
            with ResultsStorage(self.config) as storage:
                storage.insertResults(results=matches, params=args, stats=details,
                                      startTime=start, completeTime=end, userEmail="",
                                      execution_id=execution_id)

        threading.Thread(target=do_result_insert).start()

        if 0 < result_size_limit < len(matches):
            result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None,
                                      computeOptions=None, executionId=execution_id, status_code=202)
        else:
            result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
                                      computeOptions=None, executionId=execution_id)

        return result