src/alibabacloud_rds_openapi_mcp_server/utils.py (114 lines of code) (raw):

import csv import os from datetime import datetime, timezone from io import StringIO from alibabacloud_bssopenapi20171214.client import Client as BssOpenApi20171214Client from alibabacloud_rds20140815.client import Client as RdsClient from alibabacloud_tea_openapi.models import Config from alibabacloud_vpc20160428.client import Client as VpcClient PERF_KEYS = { "mysql": { "MemCpuUsage": ["MySQL_MemCpuUsage"], "QPSTPS": ["MySQL_QPSTPS"], "Sessions": ["MySQL_Sessions"], "COMDML": ["MySQL_COMDML"], "RowDML": ["MySQL_RowDML"], "SpaceUsage": ["MySQL_DetailedSpaceUsage"], "ThreadStatus": ["MySQL_ThreadStatus"], "MBPS": ["MySQL_MBPS"], "DetailedSpaceUsage": ["MySQL_DetailedSpaceUsage"] }, "pgsql": { "MemCpuUsage": ["MemoryUsage", "CpuUsage"], "QPSTPS": ["PolarDBQPSTPS"], "Sessions": ["PgSQL_Session"], "COMDML": ["PgSQL_COMDML"], "RowDML": ["PolarDBRowDML"], "SpaceUsage": ["PgSQL_SpaceUsage"], "ThreadStatus": [], "MBPS": [], "DetailedSpaceUsage": ["SQLServer_DetailedSpaceUsage"] }, "sqlserver": { "MemCpuUsage": ["SQLServer_CPUUsage"], "QPSTPS": ["SQLServer_QPS", "SQLServer_IOPS"], "Sessions": ["SQLServer_Sessions"], "COMDML": [], "RowDML": [], "SpaceUsage": ["SQLServer_DetailedSpaceUsage"], "ThreadStatus": [], "MBPS": [], "DetailedSpaceUsage": ["PgSQL_SpaceUsage"] } } def transform_to_iso_8601(dt: datetime, timespec: str): return dt.astimezone(timezone.utc).isoformat(timespec=timespec).replace("+00:00", "Z") def transform_to_datetime(s: str): try: dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S") except ValueError: dt = datetime.strptime(s, "%Y-%m-%d %H:%M") return dt def transform_perf_key(db_type: str, perf_keys: list[str]): perf_key_after_transform = [] for key in perf_keys: if key in PERF_KEYS[db_type.lower()]: perf_key_after_transform.extend(PERF_KEYS[db_type.lower()][key]) else: perf_key_after_transform.append(key) return perf_key_after_transform def json_array_to_csv(data): if not data or not isinstance(data, list): return "" fieldnames = set() for item in data: if isinstance(item, dict): fieldnames.update(item.keys()) elif hasattr(item, 'to_map'): fieldnames.update(item.to_map().keys()) if not fieldnames: return "" output = StringIO() writer = csv.DictWriter(output, fieldnames=sorted(fieldnames)) writer.writeheader() for item in data: if isinstance(item, dict): writer.writerow({k: v if v is not None else '' for k, v in item.items()}) elif hasattr(item, 'to_map'): writer.writerow({k: v if v is not None else '' for k, v in item.to_map().items()}) return output.getvalue() def get_rds_client(region_id: str): config = Config( access_key_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), access_key_secret=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), security_token=os.getenv('ALIBABA_CLOUD_SECURITY_TOKEN'), region_id=region_id, protocol="https", connect_timeout=10 * 1000, read_timeout=300 * 1000 ) client = RdsClient(config) return client def get_vpc_client(region_id: str) -> VpcClient: """Get VPC client instance. Args: region_id: The region ID for the VPC client. Returns: VpcClient: The VPC client instance for the specified region. """ config = Config( access_key_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), access_key_secret=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), security_token=os.getenv('ALIBABA_CLOUD_SECURITY_TOKEN'), region_id=region_id, protocol="https", connect_timeout=10 * 1000, read_timeout=300 * 1000 ) return VpcClient(config) def get_bill_client(region_id: str): config = Config( access_key_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), access_key_secret=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), security_token=os.getenv('ALIBABA_CLOUD_SECURITY_TOKEN'), region_id=region_id, protocol="https", connect_timeout=10 * 1000, read_timeout=300 * 1000 ) client = BssOpenApi20171214Client(config) return client