server/app/plugins/mailstats.py (71 lines of code) (raw):

#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ASF Infrastructure Reporting Dashboard - Mail Transport Statistics Tasks""" import asyncio from ..lib import config from .. import plugins import aiohttp import json import time _stats: dict = {} def get_stats(): return _stats def trim_stats(stats): """Trims the stats, removing items that we do not currently use, shortening syntax for other items""" trimmed_stats = [] for entry in stats: all_pending = [ sum(x["pending"] for x in entry["recipients"].values()), sum(x["pending"] for x in entry["senders"].values()) ] entry_trimmed = { "ts": entry["timestamp"], "pending": max(all_pending), "pending_by_recipient": {k: v["pending"] for k, v in entry["recipients"].items()}, "pending_by_sender": {k: v["pending"] for k, v in entry["senders"].items()}, } trimmed_stats.append(entry_trimmed) return trimmed_stats def collate_stats(*stats): """Collates (sums up) stats from all hosts into one unified, global stat""" pending_by_recipient = {} pending_by_sender = {} pending_count = {} cutoff = int(time.time() - 86400) # Only grab stats if from less than 24h ago for stat in stats: for entry in stat: ts = entry["ts"] if ts < cutoff: continue ts = str(ts) # convert to string for dict to work if ts not in pending_by_recipient: pending_by_recipient[ts] = entry["pending_by_recipient"].copy() else: p_r = pending_by_recipient.get(ts) p_r.update({k: v+p_r.get(k, 0) for k, v in entry["pending_by_recipient"].items()}) if ts not in pending_by_sender: pending_by_sender[ts] = entry["pending_by_sender"].copy() else: p_s = pending_by_sender[ts] p_s.update({k: v+p_s.get(k, 0) for k, v in entry["pending_by_sender"].items()}) pending_count[ts] = pending_count.get(ts, 0) + entry["pending"] return [{ "ts": int(k), "pending": pending_count[k], "pending_by_recipient": pending_by_recipient[k], "pending_by_sender": pending_by_sender[k] } for k in pending_count] async def mail_scan(): """Grabs mxout statistics from all hosts, collates it""" mxout_stats = {} async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as hc: for hostname in config.reporting.mailstats.get("hosts", []): try: async with hc.get(f"http://{hostname}:8083/qshape.json") as req: if req.status == 200: mxout_stats[hostname] = trim_stats(await req.json()) except (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError) as e: print(f"Could not fetch JSON from {hostname}: {e}") mxout_collated = {} _stats.clear() _stats.update(mxout_stats) _stats["collated"] = collate_stats(*mxout_stats.values()) async def scan_loop(): while True: await mail_scan() await asyncio.sleep(300) plugins.root.register(scan_loop, slug="mailstats", title="Mail Transport Statistics", icon="bi-envelope-exclamation-fill", private=True)