in parquet_flask/io_logic/sub_collection_statistics.py [0:0]
def start(self):
es_terms = []
if self.__query_props.provider is not None:
es_terms.append({'term': {CDMSConstants.provider_col: self.__query_props.provider}})
if self.__query_props.project is not None:
es_terms.append({'term': {CDMSConstants.project_col: self.__query_props.project}})
if self.__query_props.platform_code is not None:
if isinstance(self.__query_props.platform_code, list):
es_terms.append({
'bool': {
'should': [
{'term': {CDMSConstants.platform_code_col: k}} for k in self.__query_props.platform_code
]
}
})
else:
es_terms.append({'term': {CDMSConstants.platform_code_col: self.__query_props.platform_code}})
if self.__query_props.min_depth is not None and self.__query_props.max_depth is not None:
es_terms.append({'range': {CDMSConstants.max_depth: {'gte': self.__query_props.min_depth}}})
es_terms.append({'range': {CDMSConstants.min_depth: {'lte': self.__query_props.max_depth}}})
if self.__query_props.min_datetime is not None and self.__query_props.max_datetime is not None:
es_terms.append({'range': {CDMSConstants.max_datetime: {'gte': self.__query_props.min_datetime}}})
es_terms.append({'range': {CDMSConstants.min_datetime: {'lte': self.__query_props.max_datetime}}})
if self.__query_props.min_lat_lon is not None and self.__query_props.max_lat_lon is not None:
es_terms.append({'range': {CDMSConstants.max_lat: {'gte': self.__query_props.min_lat_lon[0]}}})
es_terms.append({'range': {CDMSConstants.min_lat: {'lte': self.__query_props.max_lat_lon[0]}}})
es_terms.append({'range': {CDMSConstants.max_lon: {'gte': self.__query_props.min_lat_lon[1]}}})
es_terms.append({'range': {CDMSConstants.min_lon: {'lte': self.__query_props.max_lat_lon[1]}}})
normal_agg_stmts = {
"totals": {
"sum": {"field": "total"}}
,
"max_datetime": {
"max": {
"field": CDMSConstants.max_datetime
}
},
"max_depth": {
"max": {
"field": CDMSConstants.max_depth
}
},
"max_lat": {
"max": {
"field": CDMSConstants.max_lat
}
},
"max_lon": {
"max": {
"field": CDMSConstants.max_lon
}
},
"min_datetime": {
"min": {
"field": CDMSConstants.min_datetime
}
},
"min_depth": {
"min": {
"field": CDMSConstants.min_depth
}
},
"min_lat": {
"min": {
"field": CDMSConstants.min_lat
}
},
"min_lon": {
"min": {
"field": CDMSConstants.min_lon
}
}
}
stats_dsl = {
"size": 0,
"query": {
'bool': {
'must': es_terms
}
},
"aggs": {
"by_provider": {
"terms": {
"field": CDMSConstants.provider_col
},
"aggs": {
"by_project": {
"terms": {"field": CDMSConstants.project_col},
"aggs": {
"by_platform_code": {
"terms": {"field": CDMSConstants.platform_code_col},
"aggs": {**normal_agg_stmts, **self.__get_observation_agg_stmts()}
}
}
}
}
}
}
}
LOGGER.warning(f'es_dsl: {json.dumps(stats_dsl)}')
es_result = self.__es.query(stats_dsl, CDMSConstants.es_index_parquet_stats)
# statistics = {k: v['value'] for k, v in es_result['aggregations'].items()}
return self.__restructure_stats(es_result['aggregations'])