in tools/plugins/elastic.py [0:0]
def __init__(self, logger_level=None, trace_level=None, is_async=False):
# Fetch config
config = ponymailconfig.PonymailConfig()
# Set default names for all indices we use
dbname = config.get('elasticsearch', 'dbname', fallback='ponymail')
self.dbname = dbname
self.db_mbox = dbname + '-mbox'
self.db_source = dbname + '-source'
self.db_account = dbname + '-account'
self.db_attachment = dbname + '-attachment'
self.db_session = dbname + '-session'
self.db_notification = dbname + '-notification'
self.db_auditlog = dbname + '-auditlog'
self.db_version = 0
self.is_async = is_async
dburl = config.get('elasticsearch', 'dburl', fallback=None)
if not dburl:
ssl = config.get('elasticsearch', 'ssl', fallback=False)
uri = config.get('elasticsearch', 'uri', fallback='')
auth = None
if config.has_option('elasticsearch', 'user'):
auth = (
config.get('elasticsearch', 'user'),
config.get('elasticsearch', 'password')
)
dburl = {
"host": config.get('elasticsearch', 'hostname', fallback='localhost'),
"port": config.get('elasticsearch', 'port', fallback=9200),
"use_ssl": ssl,
"url_prefix": uri,
"auth": auth,
"ca_certs": certifi.where(),
}
# Always allow this to be set; will be replaced as necessary by wait_for_active_shards
self.consistency = config.get("elasticsearch", "write", fallback="quorum")
if logger_level:
eslog = logging.getLogger("elasticsearch")
eslog.setLevel(logger_level)
eslog.addHandler(logging.StreamHandler())
else:
# elasticsearch logs lots of warnings on retries/connection failure
logging.getLogger("elasticsearch").setLevel(logging.ERROR)
if trace_level:
trace = logging.getLogger("elasticsearch.trace")
trace.setLevel(trace_level)
trace.addHandler(logging.StreamHandler())
if self.is_async:
self.es = AsyncElasticsearch(
[
dburl
],
max_retries=5,
retry_on_timeout=True,
)
else:
self.es = Elasticsearch(
[
dburl
],
max_retries=5,
retry_on_timeout=True,
)
# This won't work with async, so for now we'll ignore it there...
es_engine_major = self.engineMajor()
if es_engine_major in [7, 8]:
self.wait_for_active_shards = config.get("elasticsearch", "wait", fallback=1)
else:
raise Exception("Unexpected elasticsearch version ", es_engine_major)
# Mimic ES hierarchy: es.indices.xyz()
self.indices = _indices_wrap(self)