aios/tools/hape/hape_libs/utils/fs_wrapper.py (171 lines of code) (raw):

import traceback from .logger import Logger from kazoo.client import KazooClient from .shell import LocalShell import os import shutil ## user may not have fs_util in hape environment class FsClientBase(object): def __init__(self, root_address, binary_path = None): splits = self.suffix(root_address).split("/") self._address = splits[0] self._root_path = "/".join(splits[1:]) Logger.debug("fs client address {}".format(self._address)) Logger.debug("fs client root path {}".format(self._root_path)) def suffix(self, address): delim = "://" index = address.find(delim) return address[index+len(delim):] def exists(self, path): raise NotImplementedError def get(self, path): raise NotImplementedError def mkdir(self, path): raise NotImplementedError def rm(self, path): raise NotImplementedError def write(self, content, path): raise NotImplementedError def complete_path(self, path): return "/" + os.path.join(self._root_path, path) def put(self, src, dest): raise NotImplementedError class HdfsClient(FsClientBase): def __init__(self, root_address, extend_attrs = {}): self.root_address = root_address super(HdfsClient, self).__init__(root_address) hadoop_home = extend_attrs["hadoop_home"] binary_path = extend_attrs["binary_path"] self.fs_bin = os.path.join(binary_path, "usr/local/bin/fs_util") self.cmd = "HADOOP_HOME={} {}".format(hadoop_home, self.fs_bin) def execute_wrap_fs_cmd(self, cmd_str): cmd = self.cmd + " " + cmd_str out, fail = LocalShell.execute_command(cmd, grep_text="fail") if fail: raise RuntimeError("Failed to execute fs_util command: {}, detail:{}".format(cmd, out)) return out def rm(self, path): Logger.debug("start to delete hdfs path {}".format(path)) self.execute_wrap_fs_cmd("rm {}".format(path)) def get(self, path): Logger.debug("get content of hdfs path {}".format(path)) result = self.execute_wrap_fs_cmd("cat {}".format(path)) lines = result.strip().split("\n") return "\n".join(lines) def mkdir(self, path): Logger.debug("mkdir hdfs path {}".format(path)) self.execute_wrap_fs_cmd("mkdir -p {}".format(path)) def exists(self, path): try: self.execute_wrap_fs_cmd("ls {}".format(path)) return True except: return False def put(self, src, dest): self.execute_wrap_fs_cmd("cp -r {} {}".format(src, dest)) def complete_path(self, path): return os.path.join(self.root_address, path) class LocalClient(FsClientBase): def __init__(self, root_address, extend_attrs = {}): super(LocalClient, self).__init__(root_address) def complete_path(self, path): return "/" + os.path.join(self._address, self._root_path, path) def rm(self, path): LocalShell.execute_command("rm -rf {}".format(os.path.join(self._address, path))) def get(self, path): with open(path, 'r') as f: return f.read() def mkdir(self, path): if not os.path.exists(path): os.makedirs(path) def write(self, content, path): if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) with open(path, 'w') as f: f.write(content) return True def exists(self, path): return os.path.exists(path) def put(self, src, dest): if os.path.isfile(src): dest_dir = os.path.dirname(dest) if not os.path.exists(dest_dir): os.makedirs(dest_dir) shutil.copyfile(src, dest) else: shutil.copytree(src, dest) class ZfsClient(FsClientBase): def __init__(self, root_address, extend_attrs = {}): super(ZfsClient, self).__init__(root_address) self._client = KazooClient(hosts=self._address) self._client.start() def exists(self, zfs_path): return self._client.exists(zfs_path) def get(self, zfs_path): Logger.debug("get zk path {}".format(zfs_path)) data, stat = self._client.get(zfs_path) return data.decode() def mkdir(self, zfs_path): Logger.debug("create zk path {}".format(zfs_path)) if self.exists(zfs_path): Logger.warning("{} already exists, no need to create".format(zfs_path)) return self._client.ensure_path(zfs_path) def rm(self, zfs_path): Logger.debug("remove zk path {}".format(zfs_path)) if not self.exists(zfs_path): Logger.warning("{} not exists, no need to remove".format(zfs_path)) return try: self._client.delete(zfs_path, recursive=True) except: Logger.debug(traceback.format_exc()) def write(self, content, zfs_path): Logger.debug("write zk path {} with content {}".format(zfs_path, content)) path = zfs_path self._client.ensure_path("/".join(path.split("/")[:-1])) if not self.exists(zfs_path): self._client.create(path, content.encode('utf-8')) else: self._client.set(path, content.encode('utf-8')) class FsWrapper: FS_CLS = { "zfs": ZfsClient, "hdfs": HdfsClient, "jfs": HdfsClient, "LOCAL": LocalClient } def __init__(self, root_address, type = None, extend_attrs = {}): self._client = None #type: FsClientBase if root_address.startswith("/"): root_address = "LOCAL:/" + root_address if type == None: for proto in FsWrapper.FS_CLS.keys(): prefix = proto + "://" if root_address.startswith(prefix): Logger.debug("Open {} fs client {}".format(proto, root_address)) self._client = FsWrapper.FS_CLS[proto](root_address, extend_attrs = extend_attrs) else: Logger.debug("Open {} fs client {}".format(type, root_address)) self._client = FsWrapper.FS_CLS[type](root_address, extend_attrs = extend_attrs) if self._client == None: raise ValueError("Root path [{}] should starts with one of the prefixes [{}]".format( root_address, list(FsWrapper.FS_CLS.keys()))) self.root_address = root_address def rm(self, path): self._client.rm(self._client.complete_path(path)) def mkdir(self, path): self._client.mkdir(self._client.complete_path(path)) def write(self, content, path): return self._client.write(content,self._client.complete_path(path)) def exists(self, path): return self._client.exists(self._client.complete_path(path)) def get(self, path): return self._client.get(self._client.complete_path(path)) def put(self, src, dest, filename="__tmp__"): # dest = os.path.join(self._client.complete_path(dest), filename) Logger.info("upload local files from {} to {}".format(src, dest)) ## can't remove, some processor maybe using it if self.exists(dest): Logger.warning("{} already exists".format(dest)) return return self._client.put(src, dest) def complete_path(self, path): return self._client.complete_path(path)