odps/tempobj.py (407 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
import copy
import glob
import hashlib
import json
import os
import stat
import subprocess
import sys
import tempfile
import threading
import time
import uuid
from . import utils
from .accounts import AliyunAccount
from .compat import builtins, futures, pickle, six
from .config import options
from .errors import NoSuchObject
TEMP_ROOT = utils.build_pyodps_dir("tempobjs")
SESSION_KEY = "%d_%s" % (int(time.time()), uuid.uuid4())
CLEANER_THREADS = 100
USER_FILE_RIGHTS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
CLEANUP_SCRIPT_TMPL = u"""
#-*- coding:utf-8 -*-
import os
import sys
import json
try:
os.unlink(os.path.realpath(__file__))
except Exception:
pass
temp_codes = json.loads({odps_info!r})
import_paths = json.loads({import_paths!r})
biz_ids = json.loads({biz_ids!r})
if sys.version_info[0] < 3:
if sys.platform == 'win32':
import_paths = [p.encode('mbcs') for p in import_paths]
else:
import_paths = [p.encode() for p in import_paths]
normed_paths = set(os.path.normcase(os.path.normpath(p)) for p in sys.path)
import_paths = [p for p in import_paths
if os.path.normcase(os.path.normpath(p)) not in normed_paths]
sys.path.extend(import_paths)
from odps import ODPS, tempobj
if os.environ.get('WAIT_CLEANUP') == '1':
tempobj.cleanup_timeout = None
else:
tempobj.cleanup_timeout = 5
tempobj.cleanup_mode = True
tempobj.host_pid = {host_pid}
tempobj.ObjectRepositoryLib.biz_ids = set(biz_ids)
for o_desc in temp_codes:
ODPS(**o_desc)
os._exit(0)
""".lstrip()
cleanup_mode = False
cleanup_timeout = 0
host_pid = os.getpid()
if six.PY3: # make flake8 happy
unicode = str
class ExecutionEnv(object):
def __init__(self, **kwargs):
self.cleaned = False
self.os = os
self.sys = sys
self._g_env = copy.copy(globals())
self.is_windows = sys.platform == "win32"
self.pid = os.getpid()
self.os_sep = os.sep
self.executable = sys.executable
self.six = six
import_paths = copy.deepcopy(sys.path)
package_root = os.path.dirname(__file__)
if package_root not in import_paths:
import_paths.append(package_root)
self.import_path_json = utils.to_text(
json.dumps(import_paths, ensure_ascii=False)
)
self.builtins = builtins
self.io = __import__("io", fromlist=[""])
if six.PY3:
self.conv_bytes = lambda s: s.encode() if isinstance(s, str) else s
self.conv_unicode = lambda s: s if isinstance(s, str) else s.decode()
else:
self.conv_bytes = lambda s: s.encode() if isinstance(s, unicode) else s
self.conv_unicode = lambda s: s if isinstance(s, unicode) else s.decode()
self.subprocess = subprocess
self.temp_dir = tempfile.gettempdir()
self.template = CLEANUP_SCRIPT_TMPL
self.file_right = USER_FILE_RIGHTS
self.is_main_process = utils.is_main_process()
for k, v in six.iteritems(kwargs):
setattr(self, k, v)
class TempObject(object):
__slots__ = ()
_type = ""
_priority = 0
def __init__(self, *args, **kwargs):
for k, v in zip(self.__slots__, args):
setattr(self, k, v)
for k in self.__slots__:
if hasattr(self, k):
continue
setattr(self, k, kwargs.get(k))
def __hash__(self):
if self.__slots__:
return hash(tuple(getattr(self, k, None) for k in self.__slots__))
return super(TempObject, self).__hash__()
def __eq__(self, other):
if not isinstance(other, TempObject):
return False
if self._type != other._type:
return False
return all(getattr(self, k) == getattr(other, k) for k in self.__slots__)
def __ne__(self, other):
return not self.__eq__(other)
def __getstate__(self):
return {
slot: getattr(self, slot) for slot in self.__slots__ if hasattr(self, slot)
}
def __setstate__(self, state):
for slot, value in state.items():
setattr(self, slot, value)
class TempTable(TempObject):
__slots__ = "table", "project", "schema"
_type = "Table"
def drop(self, odps):
odps.delete_table(
self.table,
if_exists=True,
project=self.project,
schema=self.schema,
async_=True,
)
class TempModel(TempObject):
__slots__ = "model", "project", "schema"
_type = "OfflineModel"
def drop(self, odps):
try:
odps.delete_offline_model(
self.model, project=self.project, schema=self.schema
)
except NoSuchObject:
pass
class TempFunction(TempObject):
__slots__ = "function", "project", "schema"
_type = "Function"
_priority = 1
def drop(self, odps):
try:
odps.delete_function(
self.function, project=self.project, schema=self.schema
)
except NoSuchObject:
pass
class TempResource(TempObject):
__slots__ = "resource", "project", "schema"
_type = "Resource"
def drop(self, odps):
try:
odps.delete_resource(
self.resource, project=self.project, schema=self.schema
)
except NoSuchObject:
pass
class TempVolumePartition(TempObject):
__slots__ = "volume", "partition", "project", "schema"
_type = "VolumePartition"
def drop(self, odps):
try:
odps.delete_volume_partition(
self.volume, self.partition, project=self.project, schema=self.schema
)
except NoSuchObject:
pass
class ObjectRepository(object):
def __init__(self, file_name):
self._container = set()
self._file_name = file_name
if file_name and os.path.exists(file_name):
self.load()
def put(self, obj, dump=True):
self._container.add(obj)
if dump:
self.dump()
def cleanup(self, odps, use_threads=True):
cleaned = []
def _cleaner(obj):
try:
obj.drop(odps)
cleaned.append(obj)
except:
pass
if self._container:
if use_threads:
pool = futures.ThreadPoolExecutor(CLEANER_THREADS)
list(pool.map(_cleaner, reversed(list(self._container))))
else:
for o in sorted(
list(self._container),
key=lambda ro: type(ro)._priority,
reverse=True,
):
_cleaner(o)
for obj in cleaned:
if obj in self._container:
self._container.remove(obj)
if not self._container and self._file_name:
try:
os.unlink(self._file_name)
except OSError:
pass
else:
self.dump()
def dump(self):
if self._file_name is None:
return
try:
with open(self._file_name, "wb") as outf:
pickle.dump(list(self._container), outf, protocol=0)
outf.close()
except OSError:
return
os.chmod(self._file_name, USER_FILE_RIGHTS)
def load(self):
try:
with open(self._file_name, "rb") as inpf:
contents = pickle.load(inpf)
self._container.update(contents)
except (EOFError, OSError):
pass
class ObjectRepositoryLib(dict):
biz_ids = set([options.biz_id]) if options.biz_id else set(["default"])
odps_info = dict()
biz_ids_json = json.dumps(list(biz_ids))
odps_info_json = json.dumps([v for v in six.itervalues(odps_info)])
def __init__(self, *args, **kwargs):
super(ObjectRepositoryLib, self).__init__(*args, **kwargs)
self._env = ExecutionEnv()
def __del__(self):
self._exec_cleanup_script()
@classmethod
def add_biz_id(cls, biz_id):
cls.biz_ids.add(biz_id)
cls.biz_ids_json = json.dumps(list(cls.biz_ids))
@classmethod
def add_odps_info(cls, odps):
odps_key = _gen_repository_key(odps)
if odps_key is None:
return
cls.odps_info[odps_key] = dict(
access_id=utils.to_str(odps.account.access_id),
secret_access_key=utils.to_str(odps.account.secret_access_key),
project=utils.to_str(odps.project),
endpoint=utils.to_str(odps.endpoint),
)
cls.odps_info_json = json.dumps([v for v in six.itervalues(cls.odps_info)])
def _exec_cleanup_script(self):
global cleanup_mode
if not self:
return
env = self._env
if cleanup_mode or not env.is_main_process or env.cleaned:
return
env.cleaned = True
script = env.template.format(
import_paths=env.import_path_json,
odps_info=self.odps_info_json,
host_pid=env.pid,
biz_ids=self.biz_ids_json,
)
script_name = (
env.temp_dir + env.os_sep + "tmp_" + str(env.pid) + "_cleanup_script.py"
)
script_file = env.io.FileIO(script_name, "w")
script_file.write(env.conv_bytes(script))
script_file.close()
try:
if env.is_windows:
env.os.chmod(script_name, env.file_right)
else:
env.subprocess.call(
["chmod", oct(env.file_right).replace("o", ""), script_name]
)
except:
pass
kwargs = dict(close_fds=True)
if env.is_windows:
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs["startupinfo"] = si
env.subprocess.call([env.executable, script_name], **kwargs)
_cleaned_keys = set()
_obj_repos = (
ObjectRepositoryLib()
) # this line should be put last due to initialization dependency
atexit.register(_obj_repos._exec_cleanup_script)
def _is_pid_running(pid):
if sys.platform == "win32":
task_lines = (
os.popen('TASKLIST /FI "PID eq {0}" /NH'.format(pid))
.read()
.strip()
.splitlines()
)
if not task_lines:
return False
return str(pid) in set(task_lines[0].split())
else:
try:
os.kill(pid, 0)
return True
except OSError:
return False
def clean_objects(odps, biz_ids=None, use_threads=False):
odps_key = _gen_repository_key(odps)
if odps_key is None:
return
files = []
biz_ids = biz_ids or _obj_repos.biz_ids
for biz_id in biz_ids:
files.extend(glob.glob(os.path.join(TEMP_ROOT, biz_id, odps_key, "*.his")))
for fn in files:
repo = ObjectRepository(fn)
repo.cleanup(odps, use_threads=use_threads)
def clean_stored_objects(odps):
global cleanup_timeout, host_pid
if not utils.is_main_process():
return
odps_key = _gen_repository_key(odps)
if odps_key is None or odps_key in _cleaned_keys:
return
_cleaned_keys.add(odps_key)
files = []
for biz_id in _obj_repos.biz_ids:
files.extend(glob.glob(os.path.join(TEMP_ROOT, biz_id, odps_key, "*.his")))
def clean_thread():
for fn in files:
writer_pid = int(fn.rsplit("__", 1)[-1].split(".", 1)[0])
# we do not clean running process, unless its pid equals host_pid
if writer_pid != host_pid and _is_pid_running(writer_pid):
continue
repo = ObjectRepository(fn)
repo.cleanup(odps)
thread_obj = threading.Thread(target=clean_thread)
thread_obj.start()
if cleanup_timeout == 0:
return
else:
if cleanup_timeout is not None and cleanup_timeout < 0:
cleanup_timeout = None
thread_obj.join(cleanup_timeout)
def _gen_repository_key(odps):
if getattr(odps.account, "access_id", None):
keys = [odps.account.access_id, odps.endpoint, str(odps.project)]
elif getattr(odps.account, "token", None):
keys = [utils.to_str(odps.account.token), odps.endpoint, str(odps.project)]
else:
return
return hashlib.md5(utils.to_binary("####".join(keys))).hexdigest()
def _put_objects(odps, objs):
odps_key = _gen_repository_key(odps)
if odps_key is None:
return
biz_id = options.biz_id if options.biz_id else "default"
ObjectRepositoryLib.add_biz_id(biz_id)
if odps_key not in _obj_repos:
if isinstance(odps.account, AliyunAccount):
ObjectRepositoryLib.add_odps_info(odps)
file_dir = os.path.join(TEMP_ROOT, biz_id, odps_key)
try:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
except OSError:
pass
file_name = os.path.join(
file_dir, "temp_objs_{0}__{1}.his".format(SESSION_KEY, os.getpid())
)
_obj_repos[odps_key] = ObjectRepository(file_name)
[_obj_repos[odps_key].put(o, False) for o in objs]
_obj_repos[odps_key].dump()
def register_temp_table(odps, table, project=None, schema=None):
if isinstance(table, six.string_types):
table = [table]
_put_objects(
odps,
[
TempTable(t, project or odps.project, schema=schema or odps.schema)
for t in table
],
)
def register_temp_model(odps, model, project=None, schema=None):
if isinstance(model, six.string_types):
model = [model]
_put_objects(
odps,
[
TempModel(m, project or odps.project, schema=schema or odps.schema)
for m in model
],
)
def register_temp_resource(odps, resource, project=None, schema=None):
if isinstance(resource, six.string_types):
resource = [resource]
_put_objects(
odps,
[
TempResource(
r, project if project else odps.project, schema=schema or odps.schema
)
for r in resource
],
)
def register_temp_function(odps, func, project=None, schema=None):
if isinstance(func, six.string_types):
func = [func]
_put_objects(
odps,
[
TempFunction(
f, project if project else odps.project, schema=schema or odps.schema
)
for f in func
],
)
def register_temp_volume_partition(
odps, volume_partition_tuple, project=None, schema=None
):
if isinstance(volume_partition_tuple, tuple):
volume_partition_tuple = [volume_partition_tuple]
_put_objects(
odps,
[
TempVolumePartition(
v, p, project if project else odps.project, schema=schema or odps.schema
)
for v, p in volume_partition_tuple
],
)