in analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py [0:0]
def query_by_parts(tile_service, min_lat, max_lat, min_lon, max_lon,
dataset, start_time, end_time, part_dim=0):
nexus_max_tiles_per_query = 100
# print 'trying query: ',min_lat, max_lat, min_lon, max_lon, \
# dataset, start_time, end_time
try:
tiles = \
tile_service.find_tiles_in_box(min_lat, max_lat,
min_lon, max_lon,
dataset,
start_time=start_time,
end_time=end_time,
fetch_data=False)
assert (len(tiles) <= nexus_max_tiles_per_query)
except:
# print 'failed query: ',min_lat, max_lat, min_lon, max_lon, \
# dataset, start_time, end_time
if part_dim == 0:
# Partition by latitude.
mid_lat = (min_lat + max_lat) / 2
nexus_tiles = NexusCalcSparkHandler.query_by_parts(tile_service,
min_lat, mid_lat,
min_lon, max_lon,
dataset,
start_time, end_time,
part_dim=part_dim)
nexus_tiles.extend(NexusCalcSparkHandler.query_by_parts(tile_service,
mid_lat,
max_lat,
min_lon,
max_lon,
dataset,
start_time,
end_time,
part_dim=part_dim))
elif part_dim == 1:
# Partition by longitude.
mid_lon = (min_lon + max_lon) / 2
nexus_tiles = NexusCalcSparkHandler.query_by_parts(tile_service,
min_lat, max_lat,
min_lon, mid_lon,
dataset,
start_time, end_time,
part_dim=part_dim)
nexus_tiles.extend(NexusCalcSparkHandler.query_by_parts(tile_service,
min_lat,
max_lat,
mid_lon,
max_lon,
dataset,
start_time,
end_time,
part_dim=part_dim))
elif part_dim == 2:
# Partition by time.
mid_time = (start_time + end_time) / 2
nexus_tiles = NexusCalcSparkHandler.query_by_parts(tile_service,
min_lat, max_lat,
min_lon, max_lon,
dataset,
start_time, mid_time,
part_dim=part_dim)
nexus_tiles.extend(NexusCalcSparkHandler.query_by_parts(tile_service,
min_lat,
max_lat,
min_lon,
max_lon,
dataset,
mid_time,
end_time,
part_dim=part_dim))
else:
# No exception, so query Cassandra for the tile data.
# print 'Making NEXUS query to Cassandra for %d tiles...' % \
# len(tiles)
# t1 = time.time()
# print 'NEXUS call start at time %f' % t1
# sys.stdout.flush()
nexus_tiles = list(tile_service.fetch_data_for_tiles(*tiles))
nexus_tiles = list(tile_service.mask_tiles_to_bbox(min_lat, max_lat,
min_lon, max_lon,
nexus_tiles))
# t2 = time.time()
# print 'NEXUS call end at time %f' % t2
# print 'Seconds in NEXUS call: ', t2-t1
# sys.stdout.flush()
# print 'Returning %d tiles' % len(nexus_tiles)
return nexus_tiles