parquet_flask/aws/es_middleware.py (175 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from elasticsearch import Elasticsearch
from parquet_flask.aws.es_abstract import ESAbstract, DEFAULT_TYPE
LOGGER = logging.getLogger(__name__)
class ESMiddleware(ESAbstract):
def __init__(self, index, base_url, port=443) -> None:
if any([k is None for k in [index, base_url]]):
raise ValueError(f'index or base_url is None')
self.__index = index
base_url = base_url.replace('https://', '') # hide https
self._engine = Elasticsearch(hosts=[{'host': base_url, 'port': port}])
def __validate_index(self, index):
if index is not None:
return index
if self.__index is not None:
return self.__index
raise ValueError('index value is NULL')
def __get_doc_dict(self, docs=None, doc_ids=None, doc_dict=None):
if doc_dict is None and (docs is None and doc_ids is None):
raise ValueError('must provide either doc dictionary or doc list & id list')
if doc_dict is None: # it comes as a list
if len(docs) != len(doc_ids):
raise ValueError('length of doc and id is different')
doc_dict = {k: v for k, v in zip(doc_ids, docs)}
pass
return doc_dict
def __check_errors_for_bulk(self, index_result):
if 'errors' not in index_result or index_result['errors'] is False:
return
err_list = [[{'id': v['_id'], 'error': v['error']} for _, v in each.items() if 'error' in v] for each in
index_result['items']]
if len(err_list) < 1:
return
LOGGER.exception('failed to add some items. details: {}'.format(err_list))
return err_list
def create_index(self, index_name, index_body):
result = self._engine.indices.create(index=index_name, body=index_body, include_type_name=True)
return result
def index_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
doc_dict = self.__get_doc_dict(docs, doc_ids, doc_dict)
body = []
for k, v in doc_dict.items():
body.append({'index': {'__index': index, '_id': k, 'retry_on_conflict': 3}})
body.append(v)
index = self.__validate_index(index)
try:
index_result = self._engine.bulk(index=index,
body=body, doc_type=DEFAULT_TYPE)
LOGGER.info('indexed. result: {}'.format(index_result))
except Exception as e:
LOGGER.exception('cannot add indices with ids: {} for index: {}'.format(list(doc_dict.keys()), index))
raise RuntimeError(f'one or more failed. details: {str(e)} doc_dict: {doc_dict}')
return self.__check_errors_for_bulk(index_result)
def index_one(self, doc, doc_id, index=None):
index = self.__validate_index(index)
try:
index_result = self._engine.index(index=index,
body=doc, doc_type=DEFAULT_TYPE, id=doc_id)
LOGGER.info('indexed. result: {}'.format(index_result))
except Exception as e:
LOGGER.exception('cannot add a new index with id: {} for index: {}'.format(doc_id, index))
raise RuntimeError(f'failed to add {index}:{doc_id}. cause: {str(e)}')
return self
def update_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
doc_dict = self.__get_doc_dict(docs, doc_ids, doc_dict)
body = []
for k, v in doc_dict.items():
body.append({'update': {'__index': index, '_id': k, 'retry_on_conflict': 3}})
body.append({'doc': v, 'doc_as_upsert': True})
pass
index = self.__validate_index(index)
try:
index_result = self._engine.bulk(index=index,
body=body, doc_type=DEFAULT_TYPE)
LOGGER.info('indexed. result: {}'.format(index_result))
except:
LOGGER.exception('cannot update indices with ids: {} for index: {}'.format(list(doc_dict.keys()),
index))
return doc_dict
return self.__check_errors_for_bulk(index_result)
def update_one(self, doc, doc_id, index=None):
update_body = {
'doc': doc,
'doc_as_upsert': True
}
index = self.__validate_index(index)
try:
update_result = self._engine.update(index=index,
id=doc_id, body=update_body, doc_type=DEFAULT_TYPE)
LOGGER.info('updated. result: {}'.format(update_result))
except Exception as e:
LOGGER.exception('cannot update id: {} for index: {}'.format(doc_id, index))
raise RuntimeError(f'failed to update {index}:{doc_id}. cause: {str(e)}')
return self
@staticmethod
def get_result_size(result):
if isinstance(result['hits']['total'], dict): # fix for different datatype in elastic-search result
return result['hits']['total']['value']
else:
return result['hits']['total']
def query_with_scroll(self, dsl, querying_index=None):
scroll_timeout = '30s'
index = self.__validate_index(querying_index)
dsl['size'] = 10000 # replacing with the maximum size to minimize number of scrolls
params = {
'index': index,
'size': 10000,
'scroll': scroll_timeout,
'body': dsl,
}
first_batch = self._engine.search(**params)
total_size = self.get_result_size(first_batch)
current_size = len(first_batch['hits']['hits'])
scroll_id = first_batch['_scroll_id']
while current_size < total_size: # need to scroll
scrolled_result = self._engine.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
scroll_id = scrolled_result['_scroll_id']
scrolled_result_size = len(scrolled_result['hits']['hits'])
if scrolled_result_size == 0:
break
else:
current_size += scrolled_result_size
first_batch['hits']['hits'].extend(scrolled_result['hits']['hits'])
return first_batch
def query(self, dsl, querying_index=None):
index = self.__validate_index(querying_index)
return self._engine.search(body=dsl, index=index)
def query_pages(self, dsl, querying_index=None) -> dict:
"""
:param dsl:
:param querying_index:
:return: dict | {"total": 0, "items": []}
"""
if 'sort' not in dsl:
raise ValueError('missing `sort` in DSL. Make sure sorting is unique')
index = self.__validate_index(querying_index)
dsl['size'] = 10000 # replacing with the maximum size to minimize number of scrolls
params = {
'index': index,
'size': 10000,
'body': dsl,
}
LOGGER.debug(f'dsl: {dsl}')
first_batch = self._engine.search(**params)
current_size = len(first_batch['hits']['hits'])
total_size = current_size
while current_size > 0:
dsl['search_after'] = first_batch['hits']['hits'][-1]['sort']
paged_result = self._engine.search(**params)
current_size = len(paged_result['hits']['hits'])
total_size += current_size
first_batch['hits']['hits'].extend(paged_result['hits']['hits'])
return {
'total': len(first_batch['hits']['hits']),
'items': first_batch['hits']['hits'],
}
def query_by_id(self, doc_id, index=None):
index = self.__validate_index(index)
dsl = {
'query': {
'term': {'_id': doc_id}
}
}
result = self._engine.search(index=index, body=dsl)
if self.get_result_size(result) < 1:
return None
return result['hits']['hits'][0]
def delete_by_id(self, doc_id, index=None):
index = self.__validate_index(index)
try:
self._engine.delete(index, doc_id)
except Exception as e:
if getattr(e, 'status_code', 0) == 404:
LOGGER.info(f'deleting ID not found: {doc_id}')
return False
LOGGER.exception(f'error while deleting {doc_id}')
raise e
return True
def delete_by_query(self, dsl, index=None):
raise NotImplementedError('not yet.')