data-access/nexustiles/backends/nexusproto/dao/S3Proxy.py (93 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import boto3
import nexusproto.DataTile_pb2 as nexusproto
import numpy as np
from nexusproto.serialization import from_shaped_array
class NexusTileData(object):
__nexus_tile = None
__data = None
tile_id = None
def __init__(self, data, _tile_id):
if self.__data is None:
self.__data = data
if self.tile_id is None:
self.tile_id = _tile_id
def _get_nexus_tile(self):
if self.__nexus_tile is None:
self.__nexus_tile = nexusproto.TileData.FromString(self.__data)
return self.__nexus_tile
def get_raw_data_array(self):
nexus_tile = self._get_nexus_tile()
the_tile_type = nexus_tile.tile.WhichOneof("tile_type")
the_tile_data = getattr(nexus_tile.tile, the_tile_type)
return from_shaped_array(the_tile_data.variable_data)
def get_lat_lon_time_data_meta(self):
if self._get_nexus_tile().HasField('grid_tile'):
grid_tile = self._get_nexus_tile().grid_tile
grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data))
latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude))
longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude))
if len(grid_tile_data.shape) == 2:
grid_tile_data = grid_tile_data[np.newaxis, :]
# Extract the meta data
meta_data = {}
for meta_data_obj in grid_tile.meta_data:
name = meta_data_obj.name
meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
if len(meta_array.shape) == 2:
meta_array = meta_array[np.newaxis, :]
meta_data[name] = meta_array
return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data
elif self._get_nexus_tile().HasField('swath_tile'):
swath_tile = self._get_nexus_tile().swath_tile
latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
# Simplify the tile if the time dimension is the same value repeated
if np.all(time_data == np.min(time_data)):
time_data = np.array([np.min(time_data)])
swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
tile_data = self._to_standard_index(swath_tile_data,
(len(time_data), len(latitude_data), len(longitude_data)))
# Extract the meta data
meta_data = {}
for meta_data_obj in swath_tile.meta_data:
name = meta_data_obj.name
actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
meta_data[name] = reshaped_meta_array
return latitude_data, longitude_data, time_data, tile_data, meta_data
else:
raise NotImplementedError("Only supports grid_tile and swath_tile")
@staticmethod
def _to_standard_index(data_array, desired_shape):
if desired_shape[0] == 1:
reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
row, col = np.indices(data_array.shape)
reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
row.flat, col.flat]
reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
row.flat, col.flat]
reshaped_array = reshaped_array[np.newaxis, :]
else:
reshaped_array = np.ma.masked_all(desired_shape)
row, col = np.indices(data_array.shape)
reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
row.flat, col.flat]
reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
row.flat, col.flat]
return reshaped_array
class S3Proxy(object):
def __init__(self, config):
self.config = config
self.__s3_bucketname = config.get("s3", "bucket")
self.__s3_region = config.get("s3", "region")
self.__s3 = boto3.resource('s3')
self.__nexus_tile = None
def fetch_nexus_tiles(self, *tile_ids):
tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
(isinstance(tile_id, str) or isinstance(tile_id, str))]
res = []
for tile_id in tile_ids:
obj = self.__s3.Object(self.__s3_bucketname, str(tile_id))
data = obj.get()['Body'].read()
nexus_tile = NexusTileData(data, str(tile_id))
res.append(nexus_tile)
return res