scripts/stats.py (84 lines of code) (raw):

import ahapi import typing import elasticsearch import elasticsearch_dsl import re MAX_HITS = 50 # Different indices have different field names, account for it here: FIELD_NAMES = { "fastly": { "geo_country": "geo_country_code", "bytes": "response_body_size", "vhost": "vhost", "uri": "url", "timestamp": "timestamp", "_vhost_": "dlcdn.apache.org", "request_method": "request", }, "loggy": { "geo_country": "geo_country", "bytes": "bytes", "vhost": "vhost", "uri": "uri", "timestamp": "@timestamp", "_vhost_": "downloads.apache.org", "request_method": "request_method", } } es_client = elasticsearch.AsyncElasticsearch(hosts=["http://localhost:9200/"], timeout=45) async def process(state: typing.Any, request, formdata: dict) -> dict: duration = formdata.get('duration', '30d') project = formdata.get("project", "netbeans") downloads_by_requests = {} downloads_by_requests_unique = {} downloads_by_traffic = {} downloads_by_country = {} for provider, field_names in FIELD_NAMES.items(): q = elasticsearch_dsl.Search(using=es_client) q = q.filter("range", **{field_names['timestamp']: {"gte": f"now-{duration}"}}) q = q.filter("match", **{field_names['request_method']: "GET"}) q = q.filter("range", bytes={"gt": 5000}) q = q.filter("match", **{field_names['uri']: project}) q = q.filter("regexp", **{field_names['uri'] + ".keyword": r".*\.[a-z0-9]+"}) q = q.filter("match", **{field_names['vhost']: field_names['_vhost_']}) q.aggs.bucket("request_per_url", elasticsearch_dsl.A("terms", field=f"{field_names['uri']}.keyword", size=MAX_HITS)) \ .metric('unique_ips', 'cardinality', field='client_ip.keyword') \ .pipeline('product_by_unique', 'bucket_sort', sort=[{'unique_ips': 'desc'}]) q.aggs.bucket('by_country', 'terms', field=f"{field_names['geo_country']}.keyword", size=MAX_HITS) \ .metric('unique_ips', 'cardinality', field='client_ip.keyword') \ .pipeline('product_by_unique', 'bucket_sort', sort=[{'unique_ips': 'desc'}]) q.aggs.bucket( "requests_by_traffic", elasticsearch_dsl.A("terms", field=f"{field_names['uri']}.keyword", size=MAX_HITS, order={"bytes_sum": "desc"}), ).metric("bytes_sum", "sum", field=field_names['bytes']) resp = await es_client.search(index=f"{provider}-*", body=q.to_dict(), size=0, timeout="60s") if "aggregations" not in resp: # Skip this provider if no data is available continue for entry in resp["aggregations"]["requests_by_traffic"]["buckets"]: if "bytes_sum" in entry: url = re.sub(r"/+", "/", entry["key"]) url = re.sub("^/?" + project + "/", "", url, count=100) no_bytes = int(entry["bytes_sum"]["value"]) if no_bytes: downloads_by_traffic[url] = downloads_by_traffic.get(url, 0) + no_bytes for entry in resp["aggregations"]["request_per_url"]["buckets"]: url = re.sub(r"/+", "/", entry["key"]) url = re.sub("^/?" + project + "/", "", url, count=100) if '.' in url and not url.endswith('/'): visits = int(entry["doc_count"]) visits_unique = int(entry["unique_ips"]["value"]) downloads_by_requests[url] = downloads_by_requests.get(url, 0) + visits downloads_by_requests_unique[url] = downloads_by_requests_unique.get(url, 0) + visits_unique for entry in resp["aggregations"]["by_country"]["buckets"]: cca2 = entry["key"] if 'cca2' and cca2 != '-': visits = int(entry["unique_ips"]["value"]) downloads_by_country[cca2] = downloads_by_country.get(cca2, 0) + visits return { "timespan": duration, "by_requests": downloads_by_requests, "by_requests_unique": downloads_by_requests_unique, "by_bytes": downloads_by_traffic, "by_country": downloads_by_country, } def register(state: typing.Any): return ahapi.endpoint(process)