parquet_flask/io_logic/sub_collection_statistics.py (173 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from parquet_flask.io_logic.cdms_schema import CdmsSchema
from parquet_flask.io_logic.query_v2 import QueryProps
from parquet_flask.utils.file_utils import FileUtils
from parquet_flask.io_logic.cdms_constants import CDMSConstants
from parquet_flask.utils.config import Config
from parquet_flask.aws.es_factory import ESFactory
from parquet_flask.aws.es_abstract import ESAbstract
from parquet_flask.utils.time_utils import TimeUtils
LOGGER = logging.getLogger(__name__)
class SubCollectionStatistics:
def __init__(self, query_props: QueryProps):
config = Config()
self.__es: ESAbstract = ESFactory().get_instance('AWS',
index=CDMSConstants.es_index_parquet_stats,
base_url=config.get_value(Config.es_url),
port=int(config.get_value(Config.es_port, '443')))
self.__query_props = query_props
self.__insitu_schema = FileUtils.read_json(Config().get_value(Config.in_situ_schema))
self.__cdms_obs_names = CdmsSchema().get_observation_names(self.__insitu_schema)
def with_provider(self, provider: str):
self.__provider = provider
return self
def with_project(self, project: str):
self.__project = project
return self
def with_platforms(self, platform_code: list):
self.__platform_codes = platform_code
return self
def __restructure_core_stats(self, core_stats: dict):
"""
{
"key": "30",
"doc_count": 4724,
"min_lon": {
"value": 179.9308
},
"max_lat": {
"value": 80.5424
},
"max_datetime": {
"value": 1546300740
},
"max_lon": {
"value": 179.9996
},
"min_datetime": {
"value": 1546214460
},
"max_depth": {
"value": 6
},
"totals": {
"value": 14530387
},
"min_lat": {
"value": 80.5317
},
"min_depth": {
"value": 4
}
}
:param core_stats:
:return:
"""
core_stats = {
"platform": core_stats['key'],
"statistics": {
"total": core_stats['totals']['value'],
"min_lat_lon": [core_stats['min_lat']['value'], core_stats['min_lon']['value']],
"max_lat_lon": [core_stats['max_lat']['value'], core_stats['max_lon']['value']],
"min_depth": core_stats['min_depth']['value'],
"max_depth": core_stats['max_depth']['value'],
"min_datetime": TimeUtils.get_time_str(int(core_stats['min_datetime']['value']), in_ms=False),
"max_datetime": TimeUtils.get_time_str(int(core_stats['max_datetime']['value']), in_ms=False),
'observation_counts': {k: core_stats[k]['value'] for k in self.__cdms_obs_names}
}
}
LOGGER.debug(f'core_stats: {core_stats}')
return core_stats
def __restructure_stats(self, es_result: dict):
"""
{
"by_provider": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "Florida State University, COAPS",
"doc_count": 4724,
"by_project": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "SAMOS",
"doc_count": 4724,
"by_platform_code": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "30",
"doc_count": 4724,
"min_lon": {
"value": 179.9308
},
"max_lat": {
"value": 80.5424
},
"max_datetime": {
"value": 1546300740
},
"max_lon": {
"value": 179.9996
},
"min_datetime": {
"value": 1546214460
},
"max_depth": {
"value": 6
},
"totals": {
"value": 14530387
},
"min_lat": {
"value": 80.5317
},
"min_depth": {
"value": 4
}
}
]
}
}
]
}
}
]
}
}
:param es_result:
:return:
"""
restructured_stats = {
"providers": [
{
"provider": m['key'],
"projects": [
{
"project": l['key'],
"platforms": [
self.__restructure_core_stats(k) for k in l['by_platform_code']['buckets']
]
} for l in m['by_project']['buckets']
]
} for m in es_result['by_provider']['buckets']
]
}
LOGGER.debug(f'restructured_stats: {restructured_stats}')
return restructured_stats
def __get_observation_agg_stmts(self):
agg_stmts = {k: {
'sum': {
'field': f'observation_counts.{k}'
}
} for k in self.__cdms_obs_names}
return agg_stmts
def start(self):
es_terms = []
if self.__query_props.provider is not None:
es_terms.append({'term': {CDMSConstants.provider_col: self.__query_props.provider}})
if self.__query_props.project is not None:
es_terms.append({'term': {CDMSConstants.project_col: self.__query_props.project}})
if self.__query_props.platform_code is not None:
if isinstance(self.__query_props.platform_code, list):
es_terms.append({
'bool': {
'should': [
{'term': {CDMSConstants.platform_code_col: k}} for k in self.__query_props.platform_code
]
}
})
else:
es_terms.append({'term': {CDMSConstants.platform_code_col: self.__query_props.platform_code}})
if self.__query_props.min_depth is not None and self.__query_props.max_depth is not None:
es_terms.append({'range': {CDMSConstants.max_depth: {'gte': self.__query_props.min_depth}}})
es_terms.append({'range': {CDMSConstants.min_depth: {'lte': self.__query_props.max_depth}}})
if self.__query_props.min_datetime is not None and self.__query_props.max_datetime is not None:
es_terms.append({'range': {CDMSConstants.max_datetime: {'gte': self.__query_props.min_datetime}}})
es_terms.append({'range': {CDMSConstants.min_datetime: {'lte': self.__query_props.max_datetime}}})
if self.__query_props.min_lat_lon is not None and self.__query_props.max_lat_lon is not None:
es_terms.append({'range': {CDMSConstants.max_lat: {'gte': self.__query_props.min_lat_lon[0]}}})
es_terms.append({'range': {CDMSConstants.min_lat: {'lte': self.__query_props.max_lat_lon[0]}}})
es_terms.append({'range': {CDMSConstants.max_lon: {'gte': self.__query_props.min_lat_lon[1]}}})
es_terms.append({'range': {CDMSConstants.min_lon: {'lte': self.__query_props.max_lat_lon[1]}}})
normal_agg_stmts = {
"totals": {
"sum": {"field": "total"}}
,
"max_datetime": {
"max": {
"field": CDMSConstants.max_datetime
}
},
"max_depth": {
"max": {
"field": CDMSConstants.max_depth
}
},
"max_lat": {
"max": {
"field": CDMSConstants.max_lat
}
},
"max_lon": {
"max": {
"field": CDMSConstants.max_lon
}
},
"min_datetime": {
"min": {
"field": CDMSConstants.min_datetime
}
},
"min_depth": {
"min": {
"field": CDMSConstants.min_depth
}
},
"min_lat": {
"min": {
"field": CDMSConstants.min_lat
}
},
"min_lon": {
"min": {
"field": CDMSConstants.min_lon
}
}
}
stats_dsl = {
"size": 0,
"query": {
'bool': {
'must': es_terms
}
},
"aggs": {
"by_provider": {
"terms": {
"field": CDMSConstants.provider_col
},
"aggs": {
"by_project": {
"terms": {"field": CDMSConstants.project_col},
"aggs": {
"by_platform_code": {
"terms": {"field": CDMSConstants.platform_code_col},
"aggs": {**normal_agg_stmts, **self.__get_observation_agg_stmts()}
}
}
}
}
}
}
}
LOGGER.warning(f'es_dsl: {json.dumps(stats_dsl)}')
es_result = self.__es.query(stats_dsl, CDMSConstants.es_index_parquet_stats)
# statistics = {k: v['value'] for k, v in es_result['aggregations'].items()}
return self.__restructure_stats(es_result['aggregations'])