odps/mars_extension/oscar/filesystem.py (277 lines of code) (raw):

# Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import mmap import os import tempfile from collections import OrderedDict from urllib.parse import urlparse import numpy as np from mars.lib.filesystem import FileSystem from ...core import ODPS from ...compat import futures, BytesIO from ...errors import NoSuchObject from ...tunnel.volumetunnel import VolumeTunnel from ...utils import to_binary, to_str DIR_ERROR_MSG = ( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/, " "got {}" ) FILE_ERROR_MSG = ( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/**file**, " "got {}" ) DEFAULT_CHUNK_SIZE = 512 * 1024**2 META_FILE_NAME = "_meta_" class _VolumeFileObject(object): def __init__(self, volume_parted, mode, temp_dir=None, chunk_size=None): self._volume_parted = volume_parted self._mode = mode self._temp_dir = temp_dir self._chunk_size = chunk_size # writer self._index = 0 self._writer = None self._write_buffer = None self._meta = dict() # reader self._reader = None self._read_filename = None def _read_multi_volume_files(self): # get meta file first with self._volume_parted.open_reader(META_FILE_NAME) as meta_reader: content = to_str(meta_reader.read()) meta = OrderedDict() length = 0 for line in content.split("\n"): if "," in line: filename, size = line.split(",", 1) size = int(size) meta[filename] = size length += size temp_dir = self._temp_dir or tempfile.mkdtemp() write_filename = os.path.join( temp_dir, self._volume_parted.volume.name, self._volume_parted.name ) os.makedirs(os.path.dirname(write_filename), exist_ok=True) with open(write_filename, "wb") as f: for size in meta.values(): f.write(b"\0" * size) with open(write_filename, "rb+") as f: m = mmap.mmap(f.fileno(), length) offsets = [0] + np.cumsum(list(meta.values())).tolist() volume_part = self._volume_parted volume_tunnel = VolumeTunnel( client=volume_part._client, project=volume_part.project, endpoint=volume_part.project._tunnel_endpoint, ) def _write(filename, start, end): with volume_tunnel.create_download_session( volume_part.volume.name, volume_part.name, filename ).open() as reader: content_part = reader.read() m[start:end] = content_part executor = futures.ThreadPoolExecutor(8) fs = [] for i, filename in enumerate(meta): future = executor.submit(_write, filename, offsets[i], offsets[i + 1]) fs.append(future) [f.result() for f in fs] self._read_filename = write_filename self._reader = open(write_filename, "rb") def read(self, size=None): self._volume_parted.reload() if self._volume_parted.file_number == 1: if self._reader is None: self._reader = self._volume_parted.open_reader( list(self._volume_parted.files)[0].name ) return self._reader.read(size=size) else: if self._reader is None: self._read_multi_volume_files() return self._reader.read(size) def _ensure_writer_open(self): if self._writer is None: self._writer = self._volume_parted.open_writer() self._write_buffer = BytesIO() def _flush_to_volume_file(self): # write buffer full, write with a file filename = str(self._index) buffer = self._write_buffer content_part = buffer.getvalue() self._writer.write(filename, content_part) self._meta[filename] = buffer.tell() self._index += 1 # clear write buffer self._write_buffer = BytesIO() def write(self, content): self._ensure_writer_open() content_left = len(content) while content_left > 0: buffer_left_size = self._chunk_size - self._write_buffer.tell() content_part = content[:buffer_left_size] content = content[buffer_left_size:] self._write_buffer.write(content_part) content_left -= len(content_part) if self._write_buffer.tell() >= self._chunk_size: # write buffer full self._flush_to_volume_file() def close(self): if self._writer is not None: if self._write_buffer.tell() > 0: self._flush_to_volume_file() self._index = 0 if len(self._meta) > 1: meta_content = "\n".join( "{},{}".format(filename, size) for filename, size in self._meta.items() ) # write meta file if file length > 1 self._writer.write(META_FILE_NAME, to_binary(meta_content)) self._writer.__exit__(None, None, None) if self._reader is not None: if self._read_filename is not None: # remove read file name os.remove(self._read_filename) self._reader.__exit__(None, None, None) def __enter__(self): if self._writer or self._reader: (self._writer or self._reader).__enter__() return self def __exit__(self, *_): self.close() class VolumeFileSystem(FileSystem): """ Schema follows: /**project**/volumes/**volume** """ def __init__(self, odps=None, path=None, temp_dir=None, chunk_size=None): if odps is None: # try to get from environments odps = ODPS.from_environments() if odps is None: # try to get from global config odps = ODPS.from_global() if odps is None: # still None, raise error raise ValueError("`odps` should be specified for VolumeFileSystem") self._odps = odps self._path = path self._temp_dir = temp_dir self._chunk_size = chunk_size or DEFAULT_CHUNK_SIZE @staticmethod def parse_from_path(uri): parsed = urlparse(uri) options = dict() if parsed.path: options["path"] = parsed.path.strip("/") return options @staticmethod def _extract_info(path, is_file=False): options = VolumeFileSystem.parse_from_path(path) error_msg = DIR_ERROR_MSG if not is_file else FILE_ERROR_MSG if "path" not in options: raise ValueError(error_msg.format(path)) path = options["path"] splits = path.split("/") expect_size = 3 if not is_file else 4 if len(splits) != expect_size: raise ValueError(error_msg.format(path)) volumes = splits[1] if volumes.lower() != "volumes": raise ValueError(error_msg.format(path)) return splits[:1] + splits[2:] def ls(self, path): path = path.strip("/") or self._path project, volume = self._extract_info(path) try: files = self._odps.list_volume_partitions(volume, project=project) except NoSuchObject: raise FileNotFoundError("File {} not found".format(path)) return [ "odps:///{}/volumes/{}/{}".format(project, volume, f.name) for f in files ] def delete(self, path, recursive: bool = False): path = path.strip("/") or self._path if "/" not in path: path = "{}/{}".format(self._path, path) options = VolumeFileSystem.parse_from_path(path) fpath = options["path"] splits = fpath.split("/") if len(splits) == 3: is_file = False elif len(splits) == 4: is_file = True else: raise ValueError( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/ or" "odps:///**project**/volumes/**volume**/**partition**/, " "got {}".format(path) ) if is_file: project, volume, partition = self._extract_info(path, is_file=True) if self._odps.exist_volume_partition( volume, partition=partition, project=project ): self._odps.delete_volume_partition( volume, partition=partition, project=project ) else: project, volume = self._extract_info(path) if self._odps.exist_volume(volume, project=project): self._odps.delete_volume(volume, project=project) def open(self, path, mode="rb"): if mode not in ("rb", "wb"): raise ValueError("`mode` can be `rb` or `wb` only") path = path.strip("/") or self._path if "/" not in path: path = "{}/{}".format(self._path, path) project, volume, partition = self._extract_info(path, is_file=True) volume_parted = self._odps.get_volume_partition( volume, partition=partition, project=project ) return _VolumeFileObject( volume_parted, mode, temp_dir=self._temp_dir, chunk_size=self._chunk_size ) def mkdir(self, path, create_parents=True): assert create_parents path = path.strip("/") or self._path project, volume = self._extract_info(path) if not self._odps.exist_volume(volume, project=project): self._odps.create_parted_volume(volume, project=project) def stat(self, path): path = path.strip("/") or self._path if "/" not in path: path = "{}/{}".format(self._path, path) options = VolumeFileSystem.parse_from_path(path) fpath = options["path"] splits = fpath.split("/") if len(splits) == 3: is_file = False elif len(splits) == 4: is_file = True else: raise ValueError( "Argument `path` illegal, expect to be " "odps:///**project**/volumes/**volume**/ or" "odps:///**project**/volumes/**volume**/**partition**/, " "got {}".format(path) ) if is_file: project, volume, partition = self._extract_info(path, is_file=True) volume_parted = self._odps.get_volume_partition( volume, partition=partition, project=project ) stat = dict( name=path, size=volume_parted.length, created=volume_parted.creation_time, ) else: project, volume = self._extract_info(path) vol = self._odps.get_volume(volume, project=project) stat = dict(name=path, size=vol.length, created=vol.creation_time) return stat def _isfilestore(self): return True def cat(self, path): raise NotImplementedError def exists(self, path): raise NotImplementedError def glob(self, path, recursive=False): raise NotImplementedError def isdir(self, path): raise NotImplementedError def isfile(self, path): raise NotImplementedError def rename(self, path, new_path): raise NotImplementedError def walk(self, path): raise NotImplementedError