in analysis/webservice/algorithms_spark/Matchup.py [0:0]
def calc(self, request, tornado_io_loop, **args):
start = datetime.utcnow()
# TODO Assuming Satellite primary
bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, prioritize_distance, filename = self.parse_arguments(request)
args = {
"primary": primary_ds_name,
"matchup": secondary_ds_names,
"startTime": start_time,
"endTime": end_time,
"bbox": request.get_argument('b'),
"timeTolerance": time_tolerance,
"radiusTolerance": float(radius_tolerance),
"platforms": platforms,
"parameter": parameter_s
}
if depth_min is not None:
args["depthMin"] = float(depth_min)
if depth_max is not None:
args["depthMax"] = float(depth_max)
with ResultsStorage(self.config) as resultsStorage:
execution_id = str(resultsStorage.insertInitialExecution(
params=args,
startTime=start,
status=ExecutionStatus.RUNNING.value
))
self.log.debug("Querying for tiles in search domain")
# Get tile ids in box
tile_ids = [tile.tile_id for tile in
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
start_seconds_from_epoch, end_seconds_from_epoch,
fetch_data=False, fl='id',
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
'tile_min_lat asc'], rows=5000)]
self.log.info('Found %s tile_ids', len(tile_ids))
if not tile_ids:
# There are no matching tiles
end = datetime.utcnow()
with ResultsStorage(self.config) as storage:
storage.updateExecution(
uuid.UUID(execution_id),
completeTime=end,
status=ExecutionStatus.FAILED.value,
message='No tiles matched the provided domain',
stats=None,
results=None
)
# Start async processing with Spark. Do not wait for response
# before returning to user.
tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial(
self.async_calc,
execution_id=execution_id,
tile_ids=tile_ids,
bounding_polygon=bounding_polygon,
primary_ds_name=primary_ds_name,
secondary_ds_names=secondary_ds_names,
parameter_s=parameter_s,
start_time=start_time,
end_time=end_time,
depth_min=depth_min,
depth_max=depth_max,
time_tolerance=time_tolerance,
radius_tolerance=radius_tolerance,
platforms=platforms,
match_once=match_once,
start=start,
prioritize_distance=prioritize_distance
))
filename_param = f'&filename={filename}' if filename else ''
request.requestHandler.redirect(f'/job?id={execution_id}{filename_param}')