eventdata/runners/kibana_runner.py (62 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
logger = logging.getLogger("track.eventdata")
def extract_error_details(error_details, data):
error_data = data.get("error", {})
error_reason = error_data.get("reason") if isinstance(error_data, dict) else str(error_data)
if error_data:
error_details.add((data["status"], error_reason))
else:
error_details.add((data["status"], None))
def error_description(error_details):
description = ""
for status, reason in error_details:
if reason:
description += "HTTP status: %s, message: %s" % (str(status), reason)
else:
description += "HTTP status: %s" % str(status)
return description
async def kibana(es, params):
"""
Simulates Kibana msearch dashboard queries.
It expects the parameter hash to contain the following keys:
"body" - msearch request body representing the Kibana dashboard in the form of an array of dicts.
"params" - msearch request parameters.
"meta_data" - Dictionary containing meta data information to be carried through into metrics.
"""
request = params["body"]
request_params = params["params"]
meta_data = params["meta_data"]
if meta_data["debug"]:
logger.info("Request:\n=====\n{}\n=====".format(json.dumps(request)))
visualisations = int(len(request) / 2)
response = {}
for key in meta_data.keys():
response[key] = meta_data[key]
response["request_params"] = request_params
response["weight"] = 1
response["unit"] = "ops"
response["visualisation_count"] = visualisations
result = await es.msearch(body=request, params=request_params)
sum_hits = 0
max_took = 0
error_count = 0
error_details = set()
for r in result["responses"]:
if "error" in r:
error_count += 1
extract_error_details(error_details, r)
else:
hits = r.get("hits", {}).get("total", 0)
if isinstance(hits, dict):
sum_hits += hits["value"]
else:
sum_hits += hits
max_took = max(max_took, r["took"])
# use the request's took if possible but approximate it using the maximum of all responses
response["took"] = result.get("took", max_took)
response["hits"] = sum_hits
response["success"] = error_count == 0
response["error-count"] = error_count
if error_count > 0:
response["error-type"] = "kibana"
response["error-description"] = error_description(error_details)
if meta_data["debug"]:
for r in result["responses"]:
# clear hits otherwise we'll spam the log
if "hits" in r and "hits" in r["hits"]:
r["hits"]["hits"] = []
r["aggregations"] = {}
logger.info("Response (excluding specific hits):\n=====\n{}\n=====".format(json.dumps(result)))
return response