in analysis/webservice/algorithms_spark/MatchupDoms.py [0:0]
def calc(self, request, **args):
start = datetime.utcnow()
# TODO Assuming Satellite primary
bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit = self.parse_arguments(request)
with ResultsStorage(self.config) as resultsStorage:
execution_id = str(resultsStorage.insertExecution(None, start, None, None))
self.log.debug("Querying for tiles in search domain")
# Get tile ids in box
tile_ids = [tile.tile_id for tile in
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
start_seconds_from_epoch, end_seconds_from_epoch,
fetch_data=False, fl='id',
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
'tile_min_lat asc'], rows=5000)]
self.log.info('Found %s tile_ids', len(tile_ids))
# Call spark_matchup
self.log.debug("Calling Spark Driver")
try:
spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
except Exception as e:
self.log.exception(e)
raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
end = datetime.utcnow()
self.log.debug("Building and saving results")
args = {
"primary": primary_ds_name,
"matchup": secondary_ds_names,
"startTime": start_time,
"endTime": end_time,
"bbox": request.get_argument('b'),
"timeTolerance": time_tolerance,
"radiusTolerance": float(radius_tolerance),
"platforms": platforms,
"parameter": parameter_s
}
if depth_min is not None:
args["depthMin"] = float(depth_min)
if depth_max is not None:
args["depthMax"] = float(depth_max)
total_keys = len(list(spark_result.keys()))
total_values = sum(len(v) for v in spark_result.values())
details = {
"timeToComplete": int((end - start).total_seconds()),
"numSecondaryMatched": total_values,
"numPrimaryMatched": total_keys
}
matches = MatchupDoms.convert_to_matches(spark_result)
def do_result_insert():
with ResultsStorage(self.config) as storage:
storage.insertResults(results=matches, params=args, stats=details,
startTime=start, completeTime=end, userEmail="",
execution_id=execution_id)
threading.Thread(target=do_result_insert).start()
if 0 < result_size_limit < len(matches):
result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None,
computeOptions=None, executionId=execution_id, status_code=202)
else:
result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
computeOptions=None, executionId=execution_id)
return result