aibond/cases/hdfs-analyse/hdfscluster.py (59 lines of code) (raw):

#!/usr/bin/python # -*- coding: UTF-8 -*- from typing import Dict import paramiko from paramiko.buffered_pipe import PipeTimeout import time import socket class HDFSCluster(): """This is a class that can operate on an HDFS cluster.""" _client = None _client_host = None def __init__(self, host: str, username: str = "emr-user", password: str = None): """Initialize the HDFS cluster by connecting to the host.""" self._client = paramiko.SSHClient() self._client_host = host self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self._client.connect(host, username=username, password=password) def exec_command(self, command: str, host: str = None, timeout: int = 5) -> Dict: if host == None or self._client_host == host: stdin, stdout, stderr = self._client.exec_command(command) else: if host.find(":") != -1: host = host.split(":")[0] stdin, stdout, stderr = self._client.exec_command("ssh " + host + " '" + command + "'") stdout.channel.settimeout(timeout) retcode = 999 try: output_stdout = stdout.read().decode('utf-8') output_stderr = stderr.read().decode('utf-8') retcode = stdout.channel.recv_exit_status() except socket.timeout as e: output_stdout = "" output_stderr = "" while True: try: output_stdout += stdout.readline() + "\n" except socket.timeout as e: break while True: try: output_stderr += stderr.readline() + "\n" except socket.timeout as e: break stdin = None stdout = None stderr = None return {"stdout": output_stdout, "stderr": output_stderr, "exitStatus": retcode} def get_namenodes(self) -> str: """Get the namenode list of the HDFS cluster.""" res = self.exec_command("hdfs haadmin -getAllServiceState") return res['stdout'] def hdfs_touchz(self) -> str: """Create a test file in HDFS.""" res = self.exec_command("hdfs dfs -touchz test-file") return res #def hdfs_cat(self, path: str) -> str: # """Read the file in HDFS and delete the test file.""" # res = self.exec_command("hdfs dfs -cat " + path) # self.exec_command("hdfs dfs -rm " + path) # return res['stdout'] def namenode_log(self, host: str) -> str: """get one HDFS cluster namenode's log.""" res = self.exec_command("cd /mnt/disk1/log/hadoop-hdfs && tail -n 30 hadoop-hdfs-namenode-*.log", host) return res['stdout'][-2000:] def get_local_disk_free(self, host: str): res = self.exec_command("df", host) return res['stdout'] if __name__ == "__main__": class_instance = HDFSCluster("47.93.25.211") resp = class_instance.hdfs_touchz() print(resp)