in granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py [0:0]
def process(self, tile, dataset, *args, **kwargs):
tile_type = tile.tile.WhichOneof("tile_type")
logger.debug(f'processing granule: {tile.summary.granule}')
tile_data = getattr(tile.tile, tile_type)
latitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.latitude))
longitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.longitude))
data = from_shaped_array(tile_data.variable_data)
logger.debug(f'retrieved lat, long, data')
tile_summary = tile.summary if tile.HasField("summary") else nexusproto.TileSummary()
logger.debug(f'retrieved summary')
tile_summary.dataset_name = self._dataset_name
tile_summary.bbox.lat_min = numpy.nanmin(latitudes).item()
tile_summary.bbox.lat_max = numpy.nanmax(latitudes).item()
tile_summary.bbox.lon_min = numpy.nanmin(longitudes).item()
tile_summary.bbox.lon_max = numpy.nanmax(longitudes).item()
if all(numpy.isnan(data).flatten()):
tile_summary.stats.min = numpy.nan
tile_summary.stats.max = numpy.nan
else:
tile_summary.stats.min = numpy.nanmin(data).item()
tile_summary.stats.max = numpy.nanmax(data).item()
tile_summary.stats.count = data.size - numpy.count_nonzero(numpy.isnan(data))
logger.debug(f'set summary fields')
data_var_name = json.loads(tile_summary.data_var_name)
if not isinstance(data_var_name, list):
data_var_name = [data_var_name]
# In order to accurately calculate the average we need to weight the data based on the cosine of its latitude
# This is handled slightly differently for swath vs. grid data
if tile_type == 'swath_tile':
# For Swath tiles, len(data) == len(latitudes) == len(longitudes).
# So we can simply weight each element in the data array
tile_summary.stats.mean = type(self).calculate_mean_for_swath_tile(data, latitudes)
elif tile_type == 'grid_tile':
# Grid tiles need to repeat the weight for every longitude
# TODO This assumes data axis' are ordered as latitude x longitude
logger.debug(f'set grid mean. tile_summary.data_var_name: {tile_summary.data_var_name}')
try:
tile_summary.stats.mean = type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes, len(data_var_name))
except Exception as e:
logger.exception(f'error while setting grid mean: {str(e)}')
tile_summary.stats.mean = 0
else:
# Default to simple average with no weighting
tile_summary.stats.mean = numpy.nanmean(data).item()
logger.debug(f'find min max time')
try:
min_time, max_time = find_time_min_max(tile_data)
logger.debug(f'set min max time')
tile_summary.stats.min_time = min_time
tile_summary.stats.max_time = max_time
except NoTimeException:
pass
logger.debug(f'calc standard_name')
standard_names = [dataset.variables[k].attrs.get('standard_name')for k in data_var_name]
logger.debug(f'using standard_names: {standard_names}')
tile_summary.standard_name = json.dumps(standard_names)
logger.debug(f'copy tile_summary to tile')
tile.summary.CopyFrom(tile_summary)
return tile