odps/tunnel/volumetunnel.py (757 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import struct
import sys
from requests.exceptions import StreamConsumedError
from .. import options, serializers
from ..compat import Enum, irange, six
from ..models import errors
from ..utils import to_binary, to_text
from . import io
from .base import BaseTunnel
from .checksum import Checksum
from .errors import TunnelError
logger = logging.getLogger(__name__)
MAX_CHUNK_SIZE = 256 * 1024 * 1024
MIN_CHUNK_SIZE = 1
CHECKSUM_SIZE = 4
CHECKSUM_PACKER = ">i" if six.PY2 else ">I"
class VolumeTunnel(BaseTunnel):
"""
Volume tunnel API Entry.
:param odps: ODPS Entry object
:param str project: project name
:param str endpoint: tunnel endpoint
:param str quota_name: name of tunnel quota
"""
def create_download_session(
self,
volume,
partition_spec,
file_name,
download_id=None,
compress_option=None,
compress_algo=None,
compress_level=None,
compress_strategy=None,
tags=None,
):
if not isinstance(volume, six.string_types):
volume = volume.name
volume = self._project.volumes[volume]
if compress_option is None and compress_algo is not None:
compress_option = io.CompressOption(
compress_algo=compress_algo,
level=compress_level,
strategy=compress_strategy,
)
return VolumeDownloadSession(
self.tunnel_rest,
volume,
partition_spec,
file_name,
download_id=download_id,
compress_option=compress_option,
quota_name=self._quota_name,
tags=tags,
)
def create_upload_session(
self,
volume,
partition_spec,
upload_id=None,
compress_option=None,
compress_algo=None,
compress_level=None,
compress_strategy=None,
tags=None,
):
if not isinstance(volume, six.string_types):
volume = volume.name
volume = self._project.volumes[volume]
if compress_option is None and compress_algo is not None:
compress_option = io.CompressOption(
compress_algo=compress_algo,
level=compress_level,
strategy=compress_strategy,
)
return VolumeUploadSession(
self.tunnel_rest,
volume,
partition_spec,
upload_id=upload_id,
compress_option=compress_option,
quota_name=self._quota_name,
tags=tags,
)
class VolumeFSTunnel(BaseTunnel):
def open_reader(
self,
volume,
path,
start=None,
length=None,
compress_option=None,
compress_algo=None,
compress_level=None,
compress_strategy=None,
tags=None,
):
if not isinstance(volume, six.string_types):
volume = volume.name
volume = self._project.volumes[volume]
if start is None:
start = 0
if length is None:
file_obj = volume[path]
length = file_obj.length
headers = VolumeDownloadSession.get_common_headers(tags=tags)
headers.update(
{
"Range": "bytes={0}-{1}".format(start, start + length - 1),
"x-odps-volume-fs-path": "/" + volume.name + "/" + path.lstrip("/"),
}
)
if compress_option is not None:
if (
compress_option.algorithm
== io.CompressOption.CompressAlgorithm.ODPS_ZLIB
):
headers["Accept-Encoding"] = "deflate"
elif (
compress_option.algorithm
!= io.CompressOption.CompressAlgorithm.ODPS_RAW
):
raise TunnelError("invalid compression option")
url = volume.resource(client=self.tunnel_rest)
resp = self.tunnel_rest.get(url, headers=headers, stream=True)
if not self.tunnel_rest.is_ok(resp):
e = TunnelError.parse(resp)
raise e
if compress_option is None and compress_algo is not None:
compress_option = io.CompressOption(
compress_algo=compress_algo,
level=compress_level,
strategy=compress_strategy,
)
content_encoding = resp.headers.get("Content-Encoding")
if content_encoding is not None:
compress = True
else:
compress = False
option = compress_option if compress else None
return VolumeReader(self.tunnel_rest, resp, option)
def open_writer(
self,
volume,
path,
replication=None,
compress_option=None,
compress_algo=None,
compress_level=None,
compress_strategy=None,
tags=None,
):
if not isinstance(volume, six.string_types):
volume = volume.name
volume = self._project.volumes[volume]
headers = VolumeUploadSession.get_common_headers(tags=tags)
headers.update(
{
"Content-Type": "application/octet-stream",
"Transfer-Encoding": "chunked",
"x-odps-volume-fs-path": "/" + volume.name + "/" + path.lstrip("/"),
}
)
params = {}
if compress_option is None and compress_algo is not None:
compress_option = io.CompressOption(
compress_algo=compress_algo,
level=compress_level,
strategy=compress_strategy,
)
if compress_option is not None:
if (
compress_option.algorithm
== io.CompressOption.CompressAlgorithm.ODPS_ZLIB
):
headers["Content-Encoding"] = "deflate"
elif (
compress_option.algorithm
!= io.CompressOption.CompressAlgorithm.ODPS_RAW
):
raise TunnelError("invalid compression option")
if replication:
params["replication"] = replication
url = volume.resource(client=self.tunnel_rest)
chunk_upload = lambda data: self.tunnel_rest.post(
url, data=data, params=params, headers=headers
)
if compress_option is None and compress_algo is not None:
compress_option = io.CompressOption(
compress_algo=compress_algo,
level=compress_level,
strategy=compress_strategy,
)
return VolumeFSWriter(
self.tunnel_rest, chunk_upload, volume, path, compress_option, tags=tags
)
class BaseVolumeTunnelSession(serializers.JSONSerializableModel):
@staticmethod
def get_common_headers(content_length=None, tags=None):
header = {}
if content_length is not None:
header["Content-Length"] = content_length
tags = tags or options.tunnel.tags
if tags:
if isinstance(tags, six.string_types):
tags = tags.split(",")
header["odps-tunnel-tags"] = ",".join(tags)
return header
class VolumeDownloadSession(BaseVolumeTunnelSession):
__slots__ = (
"_client",
"project_name",
"_compress_option",
"_quota_name",
"_tags",
)
class Status(Enum):
UNKNOWN = "UNKNOWN"
NORMAL = "NORMAL"
CLOSED = "CLOSED"
EXPIRED = "EXPIRED"
id = serializers.JSONNodeField("DownloadID")
status = serializers.JSONNodeField(
"Status", parse_callback=lambda v: VolumeDownloadSession.Status(v.upper())
)
file_name = serializers.JSONNodeField("File", "FileName")
file_length = serializers.JSONNodeField("File", "FileLength")
volume_name = serializers.JSONNodeField("Partition", "Volume")
partition_spec = serializers.JSONNodeField("Partition", "Partition")
def __init__(
self,
client,
volume,
partition_spec,
file_name=None,
download_id=None,
compress_option=None,
quota_name=None,
tags=None,
):
super(VolumeDownloadSession, self).__init__()
self._client = client
self._compress_option = compress_option
self._quota_name = quota_name
self.project_name = volume.project.name
self.volume_name = volume.name
self.partition_spec = partition_spec
self.file_name = file_name
self._tags = tags or options.tunnel.tags
if isinstance(self._tags, six.string_types):
self._tags = self._tags.split(",")
if download_id is None:
self._init()
else:
self.id = download_id
self.reload()
logger.info("Tunnel session created: %r", self)
if options.tunnel_session_create_callback:
options.tunnel_session_create_callback(self)
def __repr__(self):
return (
"<VolumeDownloadSession id=%s project_name=%s volume_name=%s partition_spec=%s>"
% (self.id, self.project_name, self.volume_name, self.partition_spec)
)
def resource(self, client=None, endpoint=None):
endpoint = (
endpoint if endpoint is not None else (client or self._client).endpoint
)
return endpoint + "/projects/%s/tunnel/downloads" % self.project_name
def _init(self):
headers = self.get_common_headers(content_length=0, tags=self._tags)
params = dict(
type="volumefile",
target="/".join(
[
self.project_name,
self.volume_name,
self.partition_spec,
self.file_name,
]
),
)
if self._quota_name is not None:
params["quotaName"] = self._quota_name
url = self.resource()
resp = self._client.post(url, {}, params=params, headers=headers)
if self._client.is_ok(resp):
self.parse(resp, obj=self)
else:
e = TunnelError.parse(resp)
raise e
def reload(self):
headers = self.get_common_headers(content_length=0, tags=self._tags)
params = {}
if self.partition_spec is not None and len(self.partition_spec) > 0:
params["partition"] = self.partition_spec
if self._quota_name is not None:
params["quotaName"] = self._quota_name
url = self.resource() + "/" + str(self.id)
resp = self._client.get(url, params=params, headers=headers)
if self._client.is_ok(resp):
self.parse(resp, obj=self)
else:
e = TunnelError.parse(resp)
raise e
def open(self, start=0, length=sys.maxsize):
compress_option = self._compress_option or io.CompressOption()
params = {}
headers = self.get_common_headers(tags=self._tags)
headers.update({"Content-Length": 0, "x-odps-tunnel-version": 4})
if compress_option.algorithm == io.CompressOption.CompressAlgorithm.ODPS_ZLIB:
headers["Accept-Encoding"] = "deflate"
elif compress_option.algorithm != io.CompressOption.CompressAlgorithm.ODPS_RAW:
raise TunnelError("invalid compression option")
params["data"] = ""
params["range"] = "(%s,%s)" % (start, length)
if self._quota_name is not None:
params["quotaName"] = self._quota_name
url = self.resource()
resp = self._client.get(
url + "/" + self.id, params=params, headers=headers, stream=True
)
if not self._client.is_ok(resp):
e = TunnelError.parse(resp)
raise e
content_encoding = resp.headers.get("Content-Encoding")
if content_encoding is not None:
if content_encoding == "deflate":
self._compress_option = io.CompressOption(
io.CompressOption.CompressAlgorithm.ODPS_ZLIB, -1, 0
)
else:
raise TunnelError("Invalid content encoding")
compress = True
else:
compress = False
option = compress_option if compress else None
return VolumeReader(self._client, resp, option)
class VolumeReader(object):
def __init__(self, client, response, compress_option):
self._client = client
self._response = io.RequestsInputStream(response)
self._compress_option = compress_option
self._crc = Checksum(method="crc32")
self._buffer_size = 0
self._initialized = False
self._last_line_ending = None
self._eof = False
# buffer part left by sized read or read-line operation, see read()
self._left_part = None
self._left_part_pos = 0
# left part of checksum block when chunked, see _read_buf()
self._chunk_left = None
def _raw_read(self, size):
return self._response.read(size)
def _init_buf(self):
size_buf = self._raw_read(4)
if not size_buf:
raise IOError("Tunnel reader breaks unexpectedly.")
self._crc.update(size_buf)
chunk_size = struct.unpack(">I", size_buf)[0]
if chunk_size > MAX_CHUNK_SIZE or chunk_size < MIN_CHUNK_SIZE:
raise IOError(
"ChunkSize should be in [%d, %d], now is %d."
% (MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, chunk_size)
)
self._buffer_size = CHECKSUM_SIZE + chunk_size
def _read_buf(self):
has_stuff = False
data_buffer = six.BytesIO()
if self._chunk_left:
# we have cached chunk left, add to buffer
data_buffer.write(self._chunk_left)
self._chunk_left = None
while data_buffer.tell() < self._buffer_size:
try:
# len(buf) might be less than _buffer_size
buf = self._raw_read(self._buffer_size)
if not buf:
break
data_buffer.write(buf)
has_stuff = True
except StopIteration:
break
except StreamConsumedError:
break
if not has_stuff:
return None
# check if we need to store the rest part.
if data_buffer.tell() <= self._buffer_size:
buf = data_buffer.getvalue()
else:
buf_all = data_buffer.getvalue()
buf, self._chunk_left = (
buf_all[: self._buffer_size],
buf_all[self._buffer_size :],
)
if len(buf) >= CHECKSUM_SIZE:
self._data_size = len(buf) - CHECKSUM_SIZE
self._crc.update(buf[: self._data_size])
checksum = struct.unpack_from(CHECKSUM_PACKER, buf, self._data_size)[0]
if checksum != self._crc.getvalue():
raise IOError("CRC check error in VolumeReader.")
else:
raise IOError("Invalid VolumeReader.")
return bytearray(buf[: self._data_size])
def read(self, size=None, break_line=False):
if size is None:
size = sys.maxsize
if self._eof:
return None
if size == 0:
return six.binary_type()
if not self._initialized:
self._initialized = True
self._init_buf()
has_stuff = False
out_buf = six.BytesIO()
if self._left_part:
if break_line:
# deal with Windows line endings
if self._left_part[self._left_part_pos] == ord(
"\n"
) and self._last_line_ending == ord("\r"):
self._last_line_ending = None
self._left_part_pos += 1
for idx in irange(self._left_part_pos, len(self._left_part)):
if self._left_part[idx] not in (ord("\r"), ord("\n")):
continue
self._last_line_ending = self._left_part[idx]
self._left_part[idx] = ord("\n")
ret = self._left_part[self._left_part_pos : idx + 1]
self._left_part_pos = idx + 1
if self._left_part_pos == len(self._left_part):
self._left_part = None
self._left_part_pos = 0
return bytes(ret)
if len(self._left_part) - self._left_part_pos >= size:
ret = self._left_part[self._left_part_pos : self._left_part_pos + size]
self._left_part_pos += size
return bytes(ret)
else:
out_buf.write(bytes(self._left_part[self._left_part_pos :]))
self._left_part = None
self._left_part_pos = 0
has_stuff = True
length_left = size - out_buf.tell()
while length_left > 0:
buf = self._read_buf()
if buf is None:
self._eof = True
break
has_stuff = True
start_pos = 0
if break_line:
if buf[0] == ord("\n") and self._last_line_ending == ord("\r"):
start_pos = 1
for idx in irange(start_pos, len(buf)):
if buf[idx] not in (ord("\r"), ord("\n")):
continue
self._last_line_ending = buf[idx]
buf[idx] = ord("\n")
out_buf.write(bytes(buf[start_pos : idx + 1]))
if idx + 1 < len(buf):
self._left_part = buf[idx + 1 :]
self._left_part_pos = 0
return out_buf.getvalue()
if len(buf) >= length_left:
out_buf.write(bytes(buf[start_pos : start_pos + length_left]))
if len(buf) > length_left:
self._left_part = buf[start_pos + length_left :]
self._left_part_pos = 0
length_left = 0
else:
out_buf.write(bytes(buf[start_pos : start_pos + self._data_size]))
length_left -= self._data_size
return out_buf.getvalue() if has_stuff else None
def _it(self, size=sys.maxsize, encoding="utf-8"):
while True:
line = self.readline(size, encoding=encoding)
if line is None:
break
yield line
def readline(self, size=sys.maxsize, encoding="utf-8"):
line = self.read(size, break_line=True)
return to_text(line, encoding=encoding)
def readlines(self, size=sys.maxsize, encoding="utf-8"):
return [line for line in self._it(size, encoding=encoding)]
def __iter__(self):
return self._it()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class VolumeUploadSession(BaseVolumeTunnelSession):
__slots__ = (
"_client",
"_compress_option",
"project_name",
"volume_name",
"partition_spec",
"_quota_name",
"_tags",
)
class Status(Enum):
UNKNOWN = "UNKNOWN"
NORMAL = "NORMAL"
CLOSING = "CLOSING"
CLOSED = "CLOSED"
CANCELED = "CANCELED"
EXPIRED = "EXPIRED"
CRITICAL = "CRITICAL"
class UploadFile(serializers.JSONSerializableModel):
file_name = serializers.JSONNodeField("FileName")
file_length = serializers.JSONNodeField("FileLength")
id = serializers.JSONNodeField("UploadID")
status = serializers.JSONNodeField(
"Status", parse_callback=lambda v: VolumeUploadSession.Status(v.upper())
)
file_list = serializers.JSONNodesReferencesField(UploadFile, "FileList")
def __init__(
self,
client,
volume,
partition_spec,
upload_id=None,
compress_option=None,
quota_name=None,
tags=None,
):
super(VolumeUploadSession, self).__init__()
self._client = client
self._compress_option = compress_option
self._quota_name = quota_name
self.project_name = volume.project.name
self.volume_name = volume.name
self.partition_spec = partition_spec
self._tags = tags or options.tunnel.tags
if isinstance(self._tags, six.string_types):
self._tags = self._tags.split(",")
if upload_id is None:
self._init()
else:
self.id = upload_id
self.reload()
self._compress_option = compress_option
logger.info("Tunnel session created: %r", self)
if options.tunnel_session_create_callback:
options.tunnel_session_create_callback(self)
def __repr__(self):
return (
"<VolumeUploadSession id=%s project_name=%s volume_name=%s partition_spec=%s>"
% (self.id, self.project_name, self.volume_name, self.partition_spec)
)
def resource(self, client=None, endpoint=None):
endpoint = (
endpoint if endpoint is not None else (client or self._client).endpoint
)
return endpoint + "/projects/%s/tunnel/uploads" % self.project_name
def _init(self):
headers = self.get_common_headers(content_length=0, tags=self._tags)
params = dict(
type="volumefile",
target="/".join([self.project_name, self.volume_name, self.partition_spec])
+ "/",
)
if self._quota_name is not None:
params["quotaName"] = self._quota_name
url = self.resource()
resp = self._client.post(url, {}, params=params, headers=headers)
if self._client.is_ok(resp):
self.parse(resp, obj=self)
else:
e = TunnelError.parse(resp)
raise e
def reload(self):
headers = self.get_common_headers(content_length=0, tags=self._tags)
params = {}
url = self.resource() + "/" + str(self.id)
resp = self._client.get(url, params=params, headers=headers)
if self._client.is_ok(resp):
self.parse(resp, obj=self)
else:
e = TunnelError.parse(resp)
raise e
@staticmethod
def _format_file_name(file_name):
buf = six.StringIO()
if file_name and file_name[0] == "/":
raise TunnelError(
"FileName cannot start with '/', file name is " + file_name
)
pre_slash = False
for ch in file_name:
if ch == "/":
if not pre_slash:
buf.write(ch)
pre_slash = True
else:
buf.write(ch)
pre_slash = False
return buf.getvalue()
def open(self, file_name, compress=False, append=False):
compress_option = self._compress_option or io.CompressOption()
headers = self.get_common_headers(tags=self._tags)
headers.update(
{
"Content-Type": "test/plain",
"Transfer-Encoding": "chunked",
"x-odps-tunnel-version": 4,
}
)
params = {}
if compress:
if (
compress_option.algorithm
== io.CompressOption.CompressAlgorithm.ODPS_ZLIB
):
headers["Content-Encoding"] = "deflate"
elif (
compress_option.algorithm
!= io.CompressOption.CompressAlgorithm.ODPS_RAW
):
raise TunnelError("invalid compression option")
file_name = self._format_file_name(file_name)
params["blockid"] = file_name
if append:
params["resume"] = ""
if self._quota_name is not None:
params["quotaName"] = self._quota_name
url = self.resource() + "/" + self.id
chunk_uploader = lambda data: self._client.post(
url, data=data, params=params, headers=headers
)
option = compress_option if compress else None
return VolumeWriter(self._client, chunk_uploader, option, tags=self._tags)
def commit(self, files):
if not files:
raise ValueError("`files` not supplied")
if isinstance(files, six.string_types):
files = [files]
formatted = [self._format_file_name(fn) for fn in files]
self.reload()
files_uploading = set(f.file_name for f in self.file_list)
if len(files_uploading) != len(formatted):
raise TunnelError(
"File number not match, server: %d, client: %d"
% (len(files_uploading), len(formatted))
)
for fn in (fn for fn in formatted if fn not in files_uploading):
raise TunnelError("File not exits on server, file name is " + fn)
self._complete_upload()
def _complete_upload(self):
headers = self.get_common_headers(content_length=0, tags=self._tags)
params = {}
if self._quota_name is not None:
params["quotaName"] = self._quota_name
url = self.resource() + "/" + self.id
resp = self._client.put(url, {}, params=params, headers=headers)
if self._client.is_ok(resp):
self.parse(resp, obj=self)
else:
e = TunnelError.parse(resp)
raise e
class VolumeWriter(object):
CHUNK_SIZE = 512 * 1024
def __init__(self, client, uploader, compress_option, tags=None):
self._client = client
self._compress_option = compress_option
self._req_io = io.RequestsIO(uploader, chunk_size=options.chunk_size)
if compress_option is None:
self._writer = self._req_io
elif compress_option.algorithm == io.CompressOption.CompressAlgorithm.ODPS_RAW:
self._writer = self._req_io
elif compress_option.algorithm == io.CompressOption.CompressAlgorithm.ODPS_ZLIB:
self._writer = io.DeflateOutputStream(self._req_io)
else:
raise errors.InvalidArgument("Invalid compression algorithm.")
self._tags = tags or options.tunnel.tags
if isinstance(self._tags, six.string_types):
self._tags = self._tags.split(",")
self._crc = Checksum(method="crc32")
self._initialized = False
self._chunk_offset = 0
def _init_writer(self):
chunk_bytes = struct.pack(">I", self.CHUNK_SIZE)
self._writer.write(chunk_bytes)
self._crc.update(chunk_bytes)
self._chunk_offset = 0
def write(self, buf, encoding="utf-8"):
buf = to_binary(buf, encoding=encoding)
if isinstance(buf, six.integer_types):
buf = bytes(bytearray([buf]))
elif isinstance(buf, six.BytesIO):
buf = buf.getvalue()
if not self._initialized:
self._initialized = True
self._init_writer()
self._req_io.start()
if not buf:
raise IOError("Invalid data buffer!")
processed = 0
while processed < len(buf):
if self._chunk_offset == self.CHUNK_SIZE:
checksum = self._crc.getvalue()
self._writer.write(struct.pack(CHECKSUM_PACKER, checksum))
self._chunk_offset = 0
else:
size = (
self.CHUNK_SIZE - self._chunk_offset
if len(buf) - processed > self.CHUNK_SIZE - self._chunk_offset
else len(buf) - processed
)
write_chunk = buf[processed : processed + size]
self._writer.write(write_chunk)
self._crc.update(write_chunk)
processed += size
self._chunk_offset += size
def close(self):
if not self._initialized:
self._initialized = True
self._init_writer()
if self._chunk_offset != 0:
checksum = self._crc.getvalue()
self._writer.write(struct.pack(CHECKSUM_PACKER, checksum))
self._writer.flush()
result = self._req_io.finish()
if result is None:
raise TunnelError("No results returned in VolumeWriter.")
if not self._client.is_ok(result):
e = TunnelError.parse(result)
raise e
return result
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# if an error occurs inside the with block, we do not commit
if exc_val is not None:
return
self.close()
class VolumeFSWriter(VolumeWriter):
def __init__(self, client, uploader, volume, path, compress_option, tags=None):
self._volume = volume
self._path = path
super(VolumeFSWriter, self).__init__(
client, uploader, compress_option, tags=tags
)
def close(self):
result = super(VolumeFSWriter, self).close()
if "x-odps-volume-sessionid" not in result.headers:
raise TunnelError("No session id returned in response.")
headers = VolumeUploadSession.get_common_headers(tags=self._tags)
headers.update(
{
"x-odps-volume-fs-path": "/"
+ self._volume.name
+ "/"
+ self._path.lstrip("/"),
"x-odps-volume-sessionid": result.headers.get(
"x-odps-volume-sessionid"
),
}
)
commit_result = self._client.put(
self._volume.resource(client=self._client), None, headers=headers
)
if not self._client.is_ok(commit_result):
e = TunnelError.parse(commit_result)
raise e
return result