src/alibabacloud_rds_openapi_mcp_server/utils.py (114 lines of code) (raw):
import csv
import os
from datetime import datetime, timezone
from io import StringIO
from alibabacloud_bssopenapi20171214.client import Client as BssOpenApi20171214Client
from alibabacloud_rds20140815.client import Client as RdsClient
from alibabacloud_tea_openapi.models import Config
from alibabacloud_vpc20160428.client import Client as VpcClient
PERF_KEYS = {
"mysql": {
"MemCpuUsage": ["MySQL_MemCpuUsage"],
"QPSTPS": ["MySQL_QPSTPS"],
"Sessions": ["MySQL_Sessions"],
"COMDML": ["MySQL_COMDML"],
"RowDML": ["MySQL_RowDML"],
"SpaceUsage": ["MySQL_DetailedSpaceUsage"],
"ThreadStatus": ["MySQL_ThreadStatus"],
"MBPS": ["MySQL_MBPS"],
"DetailedSpaceUsage": ["MySQL_DetailedSpaceUsage"]
},
"pgsql": {
"MemCpuUsage": ["MemoryUsage", "CpuUsage"],
"QPSTPS": ["PolarDBQPSTPS"],
"Sessions": ["PgSQL_Session"],
"COMDML": ["PgSQL_COMDML"],
"RowDML": ["PolarDBRowDML"],
"SpaceUsage": ["PgSQL_SpaceUsage"],
"ThreadStatus": [],
"MBPS": [],
"DetailedSpaceUsage": ["SQLServer_DetailedSpaceUsage"]
},
"sqlserver": {
"MemCpuUsage": ["SQLServer_CPUUsage"],
"QPSTPS": ["SQLServer_QPS", "SQLServer_IOPS"],
"Sessions": ["SQLServer_Sessions"],
"COMDML": [],
"RowDML": [],
"SpaceUsage": ["SQLServer_DetailedSpaceUsage"],
"ThreadStatus": [],
"MBPS": [],
"DetailedSpaceUsage": ["PgSQL_SpaceUsage"]
}
}
def transform_to_iso_8601(dt: datetime, timespec: str):
return dt.astimezone(timezone.utc).isoformat(timespec=timespec).replace("+00:00", "Z")
def transform_to_datetime(s: str):
try:
dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
except ValueError:
dt = datetime.strptime(s, "%Y-%m-%d %H:%M")
return dt
def transform_perf_key(db_type: str, perf_keys: list[str]):
perf_key_after_transform = []
for key in perf_keys:
if key in PERF_KEYS[db_type.lower()]:
perf_key_after_transform.extend(PERF_KEYS[db_type.lower()][key])
else:
perf_key_after_transform.append(key)
return perf_key_after_transform
def json_array_to_csv(data):
if not data or not isinstance(data, list):
return ""
fieldnames = set()
for item in data:
if isinstance(item, dict):
fieldnames.update(item.keys())
elif hasattr(item, 'to_map'):
fieldnames.update(item.to_map().keys())
if not fieldnames:
return ""
output = StringIO()
writer = csv.DictWriter(output, fieldnames=sorted(fieldnames))
writer.writeheader()
for item in data:
if isinstance(item, dict):
writer.writerow({k: v if v is not None else '' for k, v in item.items()})
elif hasattr(item, 'to_map'):
writer.writerow({k: v if v is not None else '' for k, v in item.to_map().items()})
return output.getvalue()
def get_rds_client(region_id: str):
config = Config(
access_key_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
security_token=os.getenv('ALIBABA_CLOUD_SECURITY_TOKEN'),
region_id=region_id,
protocol="https",
connect_timeout=10 * 1000,
read_timeout=300 * 1000
)
client = RdsClient(config)
return client
def get_vpc_client(region_id: str) -> VpcClient:
"""Get VPC client instance.
Args:
region_id: The region ID for the VPC client.
Returns:
VpcClient: The VPC client instance for the specified region.
"""
config = Config(
access_key_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
security_token=os.getenv('ALIBABA_CLOUD_SECURITY_TOKEN'),
region_id=region_id,
protocol="https",
connect_timeout=10 * 1000,
read_timeout=300 * 1000
)
return VpcClient(config)
def get_bill_client(region_id: str):
config = Config(
access_key_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
security_token=os.getenv('ALIBABA_CLOUD_SECURITY_TOKEN'),
region_id=region_id,
protocol="https",
connect_timeout=10 * 1000,
read_timeout=300 * 1000
)
client = BssOpenApi20171214Client(config)
return client