odps/mars_extension/legacy/filesystem.py (280 lines of code) (raw):

# Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import mmap import os import tempfile from collections import OrderedDict from urllib.parse import urlparse import numpy as np try: from mars.lib.filesystem import FileSystem except ImportError: from mars.filesystem import FileSystem from ...core import ODPS from ...compat import futures, BytesIO from ...errors import NoSuchObject from ...tunnel.volumetunnel import VolumeTunnel from ...utils import to_binary, to_str DIR_ERROR_MSG = ( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/, " "got {}" ) FILE_ERROR_MSG = ( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/**file**, " "got {}" ) DEFAULT_CHUNK_SIZE = 512 * 1024**2 META_FILE_NAME = "_meta_" class _VolumeFileObject(object): def __init__(self, volume_parted, mode, temp_dir=None, chunk_size=None): self._volume_parted = volume_parted self._mode = mode self._temp_dir = temp_dir self._chunk_size = chunk_size # writer self._index = 0 self._writer = None self._write_buffer = None self._meta = dict() # reader self._reader = None self._read_filename = None def _read_multi_volume_files(self): # get meta file first with self._volume_parted.open_reader(META_FILE_NAME) as meta_reader: content = to_str(meta_reader.read()) meta = OrderedDict() length = 0 for line in content.split("\n"): if "," in line: filename, size = line.split(",", 1) size = int(size) meta[filename] = size length += size temp_dir = self._temp_dir or tempfile.mkdtemp() write_filename = os.path.join( temp_dir, self._volume_parted.volume.name, self._volume_parted.name ) os.makedirs(os.path.dirname(write_filename), exist_ok=True) with open(write_filename, "wb") as f: for size in meta.values(): f.write(b"\0" * size) with open(write_filename, "rb+") as f: m = mmap.mmap(f.fileno(), length) offsets = [0] + np.cumsum(list(meta.values())).tolist() volume_part = self._volume_parted volume_tunnel = VolumeTunnel( client=volume_part._client, project=volume_part.project, endpoint=volume_part.project._tunnel_endpoint, ) def _write(filename, start, end): with volume_tunnel.create_download_session( volume_part.volume.name, volume_part.name, filename ).open() as reader: content_part = reader.read() m[start:end] = content_part executor = futures.ThreadPoolExecutor(8) fs = [] for i, filename in enumerate(meta): future = executor.submit(_write, filename, offsets[i], offsets[i + 1]) fs.append(future) [f.result() for f in fs] self._read_filename = write_filename self._reader = open(write_filename, "rb") def read(self, size=None): self._volume_parted.reload() if self._volume_parted.file_number == 1: if self._reader is None: self._reader = self._volume_parted.open_reader( list(self._volume_parted.files)[0].name ) return self._reader.read(size=size) else: if self._reader is None: self._read_multi_volume_files() return self._reader.read(size) def _ensure_writer_open(self): if self._writer is None: self._writer = self._volume_parted.open_writer() self._write_buffer = BytesIO() def _flush_to_volume_file(self): # write buffer full, write with a file filename = str(self._index) buffer = self._write_buffer content_part = buffer.getvalue() self._writer.write(filename, content_part) self._meta[filename] = buffer.tell() self._index += 1 # clear write buffer self._write_buffer = BytesIO() def write(self, content): self._ensure_writer_open() content_left = len(content) while content_left > 0: buffer_left_size = self._chunk_size - self._write_buffer.tell() content_part = content[:buffer_left_size] content = content[buffer_left_size:] self._write_buffer.write(content_part) content_left -= len(content_part) if self._write_buffer.tell() >= self._chunk_size: # write buffer full self._flush_to_volume_file() def close(self): if self._writer is not None: if self._write_buffer.tell() > 0: self._flush_to_volume_file() self._index = 0 if len(self._meta) > 1: meta_content = "\n".join( "{},{}".format(filename, size) for filename, size in self._meta.items() ) # write meta file if file length > 1 self._writer.write(META_FILE_NAME, to_binary(meta_content)) self._writer.__exit__(None, None, None) if self._reader is not None: if self._read_filename is not None: # remove read file name os.remove(self._read_filename) self._reader.__exit__(None, None, None) def __enter__(self): if self._writer or self._reader: (self._writer or self._reader).__enter__() return self def __exit__(self, *_): self.close() class VolumeFileSystem(FileSystem): """ Schema follows: /**project**/volumes/**volume** """ def __init__(self, odps=None, path=None, temp_dir=None, chunk_size=None): if odps is None: # try to get from environments odps = ODPS.from_environments() if odps is None: # try to get from global config odps = ODPS.from_global() if odps is None: # still None, raise error raise ValueError("`odps` should be specified for VolumeFileSystem") self._odps = odps self._path = path self._temp_dir = temp_dir self._chunk_size = chunk_size or DEFAULT_CHUNK_SIZE @staticmethod def parse_from_path(uri): parsed = urlparse(uri) options = dict() if parsed.path: options["path"] = parsed.path.strip("/") return options @staticmethod def _extract_info(path, is_file=False): options = VolumeFileSystem.parse_from_path(path) error_msg = DIR_ERROR_MSG if not is_file else FILE_ERROR_MSG if "path" not in options: raise ValueError(error_msg.format(path)) path = options["path"] splits = path.split("/") expect_size = 3 if not is_file else 4 if len(splits) != expect_size: raise ValueError(error_msg.format(path)) volumes = splits[1] if volumes.lower() != "volumes": raise ValueError(error_msg.format(path)) return splits[:1] + splits[2:] def ls(self, path): path = path.strip("/") or self._path project, volume = self._extract_info(path) try: files = self._odps.list_volume_partitions(volume, project=project) except NoSuchObject: raise FileNotFoundError("File {} not found".format(path)) return [ "odps:///{}/volumes/{}/{}".format(project, volume, f.name) for f in files ] def delete(self, path, recursive: bool = False): path = path.strip("/") or self._path if "/" not in path: path = "{}/{}".format(self._path, path) options = VolumeFileSystem.parse_from_path(path) fpath = options["path"] splits = fpath.split("/") if len(splits) == 3: is_file = False elif len(splits) == 4: is_file = True else: raise ValueError( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/ or" "odps:///**project**/volumes/**volume**/**partition**/, " "got {}".format(path) ) if is_file: project, volume, partition = self._extract_info(path, is_file=True) if self._odps.exist_volume_partition( volume, partition=partition, project=project ): self._odps.delete_volume_partition( volume, partition=partition, project=project ) else: project, volume = self._extract_info(path) if self._odps.exist_volume(volume, project=project): self._odps.delete_volume(volume, project=project) def open(self, path, mode="rb"): if mode not in ("rb", "wb"): raise ValueError("`mode` can be `rb` or `wb` only") path = path.strip("/") or self._path if "/" not in path: path = "{}/{}".format(self._path, path) project, volume, partition = self._extract_info(path, is_file=True) volume_parted = self._odps.get_volume_partition( volume, partition=partition, project=project ) return _VolumeFileObject( volume_parted, mode, temp_dir=self._temp_dir, chunk_size=self._chunk_size ) def mkdir(self, path, create_parents=True): assert create_parents path = path.strip("/") or self._path project, volume = self._extract_info(path) if not self._odps.exist_volume(volume, project=project): self._odps.create_parted_volume(volume, project=project) def stat(self, path): path = path.strip("/") or self._path if "/" not in path: path = "{}/{}".format(self._path, path) options = VolumeFileSystem.parse_from_path(path) fpath = options["path"] splits = fpath.split("/") if len(splits) == 3: is_file = False elif len(splits) == 4: is_file = True else: raise ValueError( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/ or" "odps:///**project**/volumes/**volume**/**partition**/, " "got {}".format(path) ) if is_file: project, volume, partition = self._extract_info(path, is_file=True) volume_parted = self._odps.get_volume_partition( volume, partition=partition, project=project ) stat = dict( name=path, size=volume_parted.length, created=volume_parted.creation_time, ) else: project, volume = self._extract_info(path) vol = self._odps.get_volume(volume, project=project) stat = dict(name=path, size=vol.length, created=vol.creation_time) return stat def _isfilestore(self): return True def cat(self, path): raise NotImplementedError def exists(self, path): raise NotImplementedError def glob(self, path, recursive=False): raise NotImplementedError def isdir(self, path): raise NotImplementedError def isfile(self, path): raise NotImplementedError def rename(self, path, new_path): raise NotImplementedError def walk(self, path): raise NotImplementedError