parquet_flask/io_logic/parquet_paths_es_retriever.py (75 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import defaultdict
from parquet_flask.aws.es_factory import ESFactory
from parquet_flask.aws.es_abstract import ESAbstract
from parquet_flask.io_logic.cdms_constants import CDMSConstants
from parquet_flask.io_logic.partitioned_parquet_path import PartitionedParquetPath
from parquet_flask.io_logic.query_v2 import QueryProps
from parquet_flask.utils.time_utils import TimeUtils
LOGGER = logging.getLogger(__name__)
class ParquetPathsEsRetriever:
def __init__(self, base_path: str, props=QueryProps()):
self.__base_path = base_path
self.__props = props
self.__es: ESAbstract = None
def load_es_obj(self, es: ESAbstract):
self.__es = es
return self
def load_es_from_config(self, es_url: str, es_index: str, es_port: int):
self.__es: ESAbstract = ESFactory().get_instance('AWS', index=es_index, base_url=es_url, port=es_port)
return self
def __step_1(self, es_results: [PartitionedParquetPath]):
base_map = defaultdict(list)
for each in es_results:
each: PartitionedParquetPath = each
base_key = f'{each.provider}_{each.project}_{each.platform}_{each.lat_lon}'
base_map[base_key].append(each)
raise NotImplementedError(f'this will be an enhancement to reduce lots of tiny paths')
def start(self):
"""
{
"_index": "parquet_stats_v1",
"_type": "_doc",
"_id": "part-00000-9cfcbe81-3ca9-4084-9b8c-db451bd8c076.c000.gz.parquet",
"_score": 1,
"_source": {
"s3_url": "s3://cdms-dev-in-situ-parquet/CDMS_insitu.geo2.parquet/provider=Florida State University, COAPS/project=SAMOS/platform_code=30/geo_spatial_interval=-25_150/year=2017/month=6/job_id=6f33d0e5-65ca-4281-b4df-2d703adee683/part-00000-9cfcbe81-3ca9-4084-9b8c-db451bd8c076.c000.gz.parquet",
"bucket": "cdms-dev-in-situ-parquet",
"name": "part-00000-9cfcbe81-3ca9-4084-9b8c-db451bd8c076.c000.gz.parquet",
"provider": "Florida State University, COAPS",
"project": "SAMOS",
"platform_code": "30",
"geo_spatial_interval": "-25_150",
"year": "2017",
"month": "6",
"total": 8532,
"min_datetime": 1497312000,
"max_datetime": 1497398340,
"min_depth": -31.5,
"max_depth": 5.9,
"min_lat": -23.8257,
"max_lat": -23.6201,
"min_lon": 154.4868,
"max_lon": 154.6771
}
} :return:
"""
if self.__es is None:
raise ValueError(f'ES Object is not loaded')
es_terms = []
if self.__props.provider is not None:
es_terms.append({'term': {CDMSConstants.provider_col: self.__props.provider}})
if self.__props.project is not None:
es_terms.append({'term': {CDMSConstants.project_col: self.__props.project}})
if self.__props.platform_code is not None:
if isinstance(self.__props.platform_code, list):
es_terms.append({
'bool': {
'should': [
{'term': {CDMSConstants.platform_code_col: k}} for k in self.__props.platform_code
]
}
})
else:
es_terms.append({'term': {CDMSConstants.platform_code_col: self.__props.platform_code}})
if self.__props.min_datetime is not None:
es_terms.append({'range': {'max_datetime': {'gte': TimeUtils.get_datetime_obj(self.__props.min_datetime).timestamp()}}})
if self.__props.max_datetime is not None:
es_terms.append({'range': {'min_datetime': {'lte': TimeUtils.get_datetime_obj(self.__props.max_datetime).timestamp()}}})
if self.__props.min_lat_lon is not None:
es_terms.append({'range': {'max_lat': {'gte': self.__props.min_lat_lon[0]}}})
es_terms.append({'range': {'max_lon': {'gte': self.__props.min_lat_lon[1]}}})
if self.__props.max_lat_lon is not None:
es_terms.append({'range': {'min_lat': {'lte': self.__props.max_lat_lon[0]}}})
es_terms.append({'range': {'min_lon': {'lte': self.__props.max_lat_lon[1]}}})
es_dsl = {
'query': {
'bool': {
'must': es_terms
}
},
'sort': [
{'min_datetime': {'order': 'asc'}},
{CDMSConstants.platform_code_col: {'order': 'asc'}},
{'min_lat': {'order': 'asc'}},
{'min_lon': {'order': 'asc'}},
{'s3_url': {'order': 'asc'}},
]
}
import json
LOGGER.warning(f'es_dsl: {json.dumps(es_dsl)}')
# self.__sorting_columns = [CDMSConstants.time_col, CDMSConstants.platform_code_col, CDMSConstants.depth_col, CDMSConstants.lat_col, CDMSConstants.lon_col]
result = self.__es.query_pages(es_dsl)
result = [PartitionedParquetPath(self.__base_path).load_from_es(k['_source']) for k in result['items']]
return result