in detection_rules/misc.py [0:0]
def add_client(*client_type, add_to_ctx=True, add_func_arg=True):
"""Wrapper to add authed client."""
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import AuthenticationException
from kibana import Kibana
def _wrapper(func):
client_ops_dict = {}
client_ops_keys = {}
for c_type in client_type:
ops = client_options.get(c_type)
client_ops_dict.update(ops)
client_ops_keys[c_type] = list(ops)
if not client_ops_dict:
raise ValueError(f'Unknown client: {client_type} in {func.__name__}')
client_ops = list(client_ops_dict.values())
@wraps(func)
@add_params(*client_ops)
def _wrapped(*args, **kwargs):
ctx: click.Context = next((a for a in args if isinstance(a, click.Context)), None)
es_client_args = {k: kwargs.pop(k, None) for k in client_ops_keys.get('elasticsearch', [])}
# shared args like cloud_id
kibana_client_args = {k: kwargs.pop(k, es_client_args.get(k)) for k in client_ops_keys.get('kibana', [])}
if 'elasticsearch' in client_type:
# for nested ctx invocation, no need to re-auth if an existing client is already passed
elasticsearch_client: Elasticsearch = kwargs.get('elasticsearch_client')
try:
if elasticsearch_client and isinstance(elasticsearch_client, Elasticsearch) and \
elasticsearch_client.info():
pass
else:
elasticsearch_client = get_elasticsearch_client(**es_client_args)
except AuthenticationException:
elasticsearch_client = get_elasticsearch_client(**es_client_args)
if add_func_arg:
kwargs['elasticsearch_client'] = elasticsearch_client
if ctx and add_to_ctx:
ctx.obj['es'] = elasticsearch_client
if 'kibana' in client_type:
# for nested ctx invocation, no need to re-auth if an existing client is already passed
kibana_client: Kibana = kwargs.get('kibana_client')
if kibana_client and isinstance(kibana_client, Kibana):
try:
with kibana_client:
if kibana_client.version:
pass # kibana_client is valid and can be used directly
except (requests.HTTPError, AttributeError):
kibana_client = get_kibana_client(**kibana_client_args)
else:
# Instantiate a new Kibana client if none was provided or if the provided one is not usable
kibana_client = get_kibana_client(**kibana_client_args)
if add_func_arg:
kwargs['kibana_client'] = kibana_client
if ctx and add_to_ctx:
ctx.obj['kibana'] = kibana_client
return func(*args, **kwargs)
return _wrapped
return _wrapper