odps/models/resourcefile.py (353 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import hashlib import os import random from .. import compat from ..compat import six from ..config import options from .resource import FileResource RESOURCE_SIZE_MAX = 512 * 1024 * 1024 # a single resource's size must be at most 512M class ResourceFile(object): __slots__ = ( "resource", "mode", "_opened", "size", "_open_binary", "_encoding", "_overwrite", ) def __init__(self, resource, mode="r", encoding="utf-8", overwrite=None): self.resource = resource self._open_binary = "b" in mode mode = mode.replace("b", "") self.mode = FileResource.Mode(mode) self._encoding = encoding if self.mode in (FileResource.Mode.WRITE, FileResource.Mode.TRUNCEREADWRITE): self.size = 0 else: self.resource._reload_size() self._opened = True self._overwrite = overwrite def _convert(self, content): if self._open_binary and isinstance(content, six.text_type): return content.encode(self._encoding) elif not self._open_binary and isinstance(content, six.binary_type): return content.decode(self._encoding) return content def _read_resource(self, offset=None, read_size=None): return self.resource.parent.read_resource( self.resource, text_mode=not self._open_binary, encoding=self._encoding, offset=offset, read_size=read_size, ) def _new_buffer(self, content=None): io_clz = six.BytesIO if self._open_binary else six.StringIO return io_clz() if content is None else io_clz(content) def read(self, size=-1): raise NotImplementedError def readline(self, size=-1): raise NotImplementedError def readlines(self, sizehint=-1): raise NotImplementedError def write(self, content): raise NotImplementedError def writelines(self, seq): raise NotImplementedError def seek(self, pos, whence=compat.SEEK_SET): # io.SEEK_SET raise NotImplementedError def seekable(self): raise NotImplementedError def tell(self): raise NotImplementedError def truncate(self, size=None): raise NotImplementedError def flush(self): raise NotImplementedError def close(self): raise NotImplementedError def _iter(self): raise NotImplementedError def __iter__(self): return self._iter() def _next(self): raise NotImplementedError def __next__(self): return self._next() next = __next__ def __enter__(self): return self def __exit__(self, *_): self.close() class LocalResourceFile(ResourceFile): __slots__ = "_fp", "_need_commit" def __init__(self, resource, mode="r", encoding="utf-8", overwrite=None): super(LocalResourceFile, self).__init__( resource, mode=mode, encoding=encoding, overwrite=overwrite ) if self.mode in (FileResource.Mode.WRITE, FileResource.Mode.TRUNCEREADWRITE): self._fp = self._new_buffer() else: self._fp = self._read_resource() self._sync_size() self._need_commit = False def _sync_size(self): curr_pos = self.tell() self.seek(0, compat.SEEK_END) self.size = self.tell() self.seek(curr_pos) def read(self, size=-1): return self._fp.read(size) def readline(self, size=-1): return self._fp.readline(size) def readlines(self, sizehint=-1): return self._fp.readlines(sizehint) def _check_size(self): if self.size > RESOURCE_SIZE_MAX: raise IOError( "Single resource's max size is %sM" % (RESOURCE_SIZE_MAX / (1024**2)) ) def write(self, content): content = self._convert(content) length = len(content) if self.mode in (FileResource.Mode.APPEND, FileResource.Mode.APPENDREADWRITE): self.seek(0, compat.SEEK_END) if length > 0: self._need_commit = True res = self._fp.write(content) self._sync_size() self._check_size() return res def writelines(self, seq): seq = [self._convert(s) for s in seq] length = sum(len(s) for s in seq) if self.mode in (FileResource.Mode.APPEND, FileResource.Mode.APPENDREADWRITE): self.seek(0, compat.SEEK_END) if length > 0: self._need_commit = True res = self._fp.writelines(seq) self._sync_size() self._check_size() return res def seek(self, pos, whence=compat.SEEK_SET): return self._fp.seek(pos, whence) def seekable(self): return True def tell(self): return self._fp.tell() def truncate(self, size=None): curr_pos = self.tell() self._fp.truncate(size) self.seek(0, compat.SEEK_END) self.size = self.tell() self.seek(curr_pos) self._need_commit = True def flush(self): if self._need_commit: is_create = self.resource._is_create() resources = self.resource.parent if is_create: resources.create(obj=self.resource, file_obj=self._fp) else: resources.update(obj=self.resource, file_obj=self._fp) self._need_commit = False def close(self): if not self._opened: # already closed return self.flush() self._fp = None self.size = 0 self._need_commit = False self._opened = False def _iter(self): return self._fp.__iter__() def _next(self): return next(self._fp) class StreamResourceFile(ResourceFile): __slots__ = ( "_md5_digest", "_buffer", "_buffered_size", "_resource_parts", "_resource_counter", "_chunk_size", "_is_source_exhausted", "_source_offset", "_rand_id", ) def __init__(self, resource, mode="r", encoding="utf-8", overwrite=None): mode = mode.replace("+", "") super(StreamResourceFile, self).__init__( resource, mode=mode, encoding=encoding, overwrite=overwrite ) if self.mode not in (FileResource.Mode.READ, FileResource.Mode.WRITE): raise compat.UnsupportedOperation( "Unsupported access mode %s under streaming mode" % mode ) self._md5_digest = hashlib.md5() self._resource_parts = [] self._resource_counter = 0 self._buffered_size = 0 self._chunk_size = options.resource_chunk_size self._rebuild_buffer() self._is_source_exhausted = False self._source_offset = 0 self._rand_id = random.randint(0, 999999) def _rebuild_buffer(self): self._buffer, buffer = self._new_buffer(), getattr(self, "_buffer", None) self._buffered_size = 0 return buffer def _build_part_resource_name(self): name = "%s.part.tmp.%06d.%06d" % ( self.resource.name, self._rand_id, self._resource_counter, ) self._resource_counter += 1 return name def _load_next_offset(self): if self._is_source_exhausted: return buf = self.resource.parent.read_resource( self.resource, offset=self._source_offset, read_size=self._chunk_size ) self._rebuild_buffer() self._buffer.write(self._convert(buf.read())) self._buffer.seek(0, os.SEEK_SET) self._buffered_size = buf.tell() self._source_offset += buf.tell() self._is_source_exhausted = buf.is_eof def read(self, size=-1): buf = self._new_buffer() size_to_read = size while size_to_read != 0: if self._buffered_size == 0: self._load_next_offset() if self._buffered_size == 0: break res = self._buffer.read(size_to_read) buf.write(res) res_len = len(res) if size_to_read > 0: size_to_read -= res_len self._buffered_size -= res_len return buf.getvalue() def _is_line_terminated(self, line): terminator = b"\n" if self._open_binary else os.linesep return line.endswith(terminator) def readline(self, size=-1): line_buf = self._new_buffer() size_to_read = size while size_to_read != 0: if self._buffered_size == 0: self._load_next_offset() if self._buffered_size == 0: break res = self._buffer.readline(size_to_read) # read and concatenate to existing line line_buf.write(res) res_len = len(res) if size_to_read > 0: size_to_read -= res_len self._buffered_size -= res_len if self._is_line_terminated(res): break return line_buf.getvalue() def readlines(self, sizehint=-1): if sizehint == 0: return [] lines_buf = [] lines_to_read = sizehint while lines_to_read != 0: if self._buffered_size == 0: self._load_next_offset() if self._buffered_size == 0: break old_pos = self._buffer.tell() lines = self._buffer.readlines(lines_to_read) if not lines: break self._buffered_size -= self._buffer.tell() - old_pos if self._is_line_terminated(lines[-1]): lines_to_read -= len(lines) has_terminator = True else: # last line not complete lines_to_read -= len(lines) - 1 has_terminator = False lines_to_read = max(-1, lines_to_read) lines_buf.append((lines, has_terminator)) # merge fraction of lines last_has_terminator = True res_line_groups = [] for lines, has_terminator in lines_buf: if not last_has_terminator: res_line_groups[-1].append(lines[0]) lines = lines[1:] if lines: res_line_groups.extend([l] for l in lines) last_has_terminator = has_terminator res_lines = [] sep = b"" if self._open_binary else "" for lg in res_line_groups: res_lines.append(sep.join(lg)) return res_lines def write(self, content): if self._buffered_size >= self._chunk_size: self.flush() content = self._convert(content) last_pos = self._buffer.tell() self._buffer.write(content) content_size = self._buffer.tell() - last_pos self._buffered_size += content_size self.size += content_size def writelines(self, seq): if self._buffered_size >= self._chunk_size: self.flush() seq = [self._convert(s) for s in seq] last_pos = self._buffer.tell() self._buffer.writelines(seq) content_size = self._buffer.tell() - last_pos self._buffered_size += content_size self.size += content_size def seek(self, pos, whence=compat.SEEK_SET): raise compat.UnsupportedOperation("File or stream is not seekable.") def seekable(self): return False def tell(self): return self.size def truncate(self, size=None): raise compat.UnsupportedOperation("File or stream is not seekable.") def flush(self): value = self._rebuild_buffer() if value.tell() > 0: res = self.resource.parent.create( name=self._build_part_resource_name(), type="file", temp=True, part=True, fileobj=value, ) self._resource_parts.append(res) if self._open_binary: self._md5_digest.update(value.getvalue()) else: self._md5_digest.update(value.getvalue().encode(self._encoding)) def close(self): if self.mode == FileResource.Mode.READ or not self._opened: return self.flush() if self._overwrite is None: self._overwrite = not self.resource._is_create() self.resource.parent.merge_part_files( self.resource, self._resource_parts, self._md5_digest.hexdigest(), overwrite=self._overwrite, ) self._opened = False def _iter(self): return self def _next(self): line = self.readline() if not line: raise StopIteration return line