aibond/cases/hdfs-analyse/fault_injector.py (81 lines of code) (raw):

#!/usr/bin/python # -*- coding: UTF-8 -*- import argparse import paramiko class BaseFault: def __init__(self, ip, username, password): self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_client.connect(ip, username=username, password=password) @staticmethod def add_args(parser): parser.add_argument("--action", required=True, choices=["inject", "recover"], help="执行故障注入或故障恢复") parser.add_argument("--ip", "--host",required=True, help="目标地址 可以为主机名或IP") parser.add_argument("--username", "-l", required=True, help="SSH 用户名") parser.add_argument("--password", required=False, help="SSH 密码") def execute_command(self, command): stdin, stdout, stderr = self.ssh_client.exec_command(command) ret_code = stdout.channel.recv_exit_status() output_stdout = stdout.read().decode('utf-8') output_stderr = stderr.read().decode('utf-8') stdin = None stdout = None stderr = None return (ret_code, output_stdout, output_stderr) def close_connection(self): self.ssh_client.close() class DiskFault(BaseFault): description = "填满指定主机上的所有硬盘" @staticmethod def add_args(parser): BaseFault.add_args(parser) parser.add_argument("--target", required=False, default="notsystem", choices=["system", "notsystem", "all"], help="指定填满的目标 system | notsystem | all") def pasrse_df(self, text): lines = text.split("\n") keys = lines[0].split() return [dict(zip(keys, line.split())) for line in lines[1:]] def get_df(self, ip): print(f"Get the current disk capacity and status. host {ip}") retCode, stdout, stderr = self.execute_command("df") print(stdout) return stdout def get_disks(self, df_data, target): mounts = [] for disk in df_data: if disk.get('Mounted') == "/" and target in ["system", "all"]: mounts.append(disk) elif disk.get('Filesystem','').startswith("/dev") and disk.get('Mounted') != "/" and target in ["notsystem", "all"]: mounts.append(disk) return mounts def inject(self, args): for disk in self.get_disks(self.pasrse_df(self.get_df(args.ip)), args.target): print(f"目录 {disk['Mounted']} 开始填充") command = f"fallocate -l {disk['Available']}k {disk['Mounted']}/fault_file" print(">>> " + command) (ret, stdout, stderr) = self.execute_command(command) print(f"ret code: {ret}\nstdout: {stdout}\nstderr: {stderr}\n") self.get_df(args.ip) def recover(self, args): for disk in self.get_disks(self.pasrse_df(self.get_df(args.ip)), args.target): print(f"目录 {disk['Mounted']} 开始释放") command = f"rm -f {disk['Mounted']}/fault_file" print(">>> " + command) (ret, stdout, stderr) = self.execute_command(command) print(f"ret code: {ret}\nstdout: {stdout}\nstderr: {stderr}\n") self.get_df(args.ip) # 可以按照 DiskFault 类的模式定义更多故障类型 FAULT_TYPES = { "disk_full": DiskFault } def main(): parser = argparse.ArgumentParser(description="故障注入工具") subparsers = parser.add_subparsers(title="故障类型", dest="type") # 对每个故障类型创建子解析器,并调用 add_args 方法 for fault_name, fault_class in FAULT_TYPES.items(): subparser = subparsers.add_parser(fault_name, help=fault_class.description) fault_class.add_args(subparser) args = parser.parse_args() fault_type = FAULT_TYPES[args.type](args.ip, args.username, args.password) if args.action == "inject": fault_type.inject(args) elif args.action == "recover": fault_type.recover(args) fault_type.close_connection() if __name__ == "__main__": main()