odps/models/resources.py (147 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .. import errors, serializers
from ..compat import six
from .core import Iterable
from .resource import FileResource, Resource
DEFAULT_RESOURCE_CHUNK_SIZE = 64 << 20
class Resources(Iterable):
marker = serializers.XMLNodeField("Marker")
max_items = serializers.XMLNodeField("MaxItems")
resources = serializers.XMLNodesReferencesField(Resource, "Resource")
def get_typed(self, name, type, **kw):
type_cls = Resource._get_cls(type)
parent = self
project_name, schema_name, name = Resource.split_resource_name(name)
if project_name is not None:
parent = self.parent.project.parent[project_name]
if schema_name is not None:
parent = parent.schemas[schema_name]
parent = parent.resources
if "temp" in kw:
kw["is_temp_resource"] = kw.pop("temp")
return type_cls(client=self._client, parent=parent, name=name, **kw)
def _get(self, name):
return self.get_typed(name, None)
def _get_parent_typed(self, item):
return Resource(
client=self._client, name=item, parent=self, type=Resource.Type.UNKOWN
)
def __contains__(self, item):
if isinstance(item, six.string_types):
try:
resource = self._get(item)
except errors.NoSuchObject:
return False
elif isinstance(item, Resource):
resource = item
else:
return False
try:
resource.reload()
return True
except errors.NoSuchObject:
return False
def __iter__(self):
return self.iterate()
def get(self, name, type=None):
return self.get_typed(name, type)
def iterate(self, name=None, owner=None):
params = {"expectmarker": "true"}
if name is not None:
params["name"] = name
if owner is not None:
params["owner"] = owner
schema_name = self._get_schema_name()
if schema_name is not None:
params["curr_schema"] = schema_name
def _it():
last_marker = params.get("marker")
if "marker" in params and (last_marker is None or len(last_marker) == 0):
return
url = self.resource()
resp = self._client.get(url, params=params)
r = Resources.parse(self._client, resp, obj=self)
params["marker"] = r.marker
return r.resources
while True:
resources = _it()
if resources is None:
break
for resource in resources:
yield resource
def create(self, obj=None, **kwargs):
if obj is None and "type" not in kwargs:
raise ValueError("Unknown resource type to create.")
if "temp" in kwargs:
kwargs["is_temp_resource"] = kwargs.pop("temp")
if "part" in kwargs:
kwargs["is_part_resource"] = kwargs.pop("part")
ctor_kw = kwargs.copy()
ctor_kw.pop("file_obj", None)
ctor_kw.pop("fileobj", None)
obj = obj or Resource(parent=self, client=self._client, **ctor_kw)
if obj.type == Resource.Type.UNKOWN:
raise ValueError("Unknown resource type to create.")
if obj.parent is None:
obj._parent = self
if obj._client is None:
obj._client = self._client
return obj.create(overwrite=False, **kwargs)
def update(self, obj, **kwargs):
return obj.create(overwrite=True, **kwargs)
def delete(self, name):
if not isinstance(name, Resource):
resource = self._get_parent_typed(name)
else:
resource = name
name = name.name
del self[name] # release cache
url = resource.resource()
self._client.delete(url, curr_schema=self._get_schema_name())
def _request(self, name, stream=False, offset=None, read_size=None):
if isinstance(name, FileResource):
res = name
else:
res = Resource(name, parent=self, client=self._client)
url = res.resource()
headers = {"Content-Type": "application/octet-stream"}
params = {}
if offset is not None:
params["rOffset"] = str(offset)
if read_size is not None:
params["rSize"] = str(read_size)
resp = self._client.get(
url,
headers=headers,
params=params,
stream=stream,
curr_schema=self._get_schema_name(),
)
return resp
def iter_resource_content(self, name, text_mode=False):
resp = self._request(name, stream=True)
return resp.iter_content(decode_unicode=text_mode)
def read_resource(
self, name, encoding="utf-8", text_mode=False, offset=None, read_size=None
):
resp = self._request(name, offset=offset, read_size=read_size)
content = resp.content
if not text_mode:
if isinstance(content, six.text_type):
content = content.encode(encoding) # read as bytes
sio = six.BytesIO(content)
else:
if isinstance(content, six.binary_type):
content = content.decode(encoding)
sio = six.StringIO(content)
has_remaining = resp.headers.get("x-odps-resource-has-remaining") or "false"
sio.is_eof = has_remaining.lower() != "true"
return sio
def merge_part_files(self, resource, part_resources, md5_hex, overwrite=False):
content = md5_hex + "|" + ",".join(res.name for res in part_resources)
total_bytes = sum(res.size for res in part_resources)
resource_args = resource.extract()
resource_args.update(
{"parent": self, "client": self._client, "merge_total_bytes": total_bytes}
)
merge_res = Resource(**resource_args)
merge_res.create(overwrite=overwrite, fileobj=content)