def _rmtree()

in optimum/intel/openvino/utils.py [0:0]


def _rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None):
    """Recursively delete a directory tree.

    If dir_fd is not None, it should be a file descriptor open to a directory;
    path will then be relative to that directory.
    dir_fd may not be implemented on your platform.
    If it is unavailable, using it will raise a NotImplementedError.

    If ignore_errors is set, errors are ignored; otherwise, if onexc or
    onerror is set, it is called to handle the error with arguments (func,
    path, exc_info) where func is platform and implementation dependent;
    path is the argument to that function that caused it to fail; and
    the value of exc_info describes the exception. For onexc it is the
    exception instance, and for onerror it is a tuple as returned by
    sys.exc_info().  If ignore_errors is false and both onexc and
    onerror are None, the exception is reraised.

    onerror is deprecated and only remains for backwards compatibility.
    If both onerror and onexc are set, onerror is ignored and onexc is used.
    """
    _use_fd_functions = (
        {os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd
        and os.scandir in os.supports_fd
        and os.stat in os.supports_follow_symlinks
    )

    if hasattr(os.stat_result, "st_file_attributes"):

        def _rmtree_islink(path):
            try:
                st = os.lstat(path)
                return stat.S_ISLNK(st.st_mode) or (
                    st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
                    and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT
                )
            except OSError:
                return False

    else:

        def _rmtree_islink(path):
            return os.path.islink(path)

    def _rmtree_safe_fd(stack, onexc):
        # Each stack item has four elements:
        # * func: The first operation to perform: os.lstat, os.close or os.rmdir.
        #   Walking a directory starts with an os.lstat() to detect symlinks; in
        #   this case, func is updated before subsequent operations and passed to
        #   onexc() if an error occurs.
        # * dirfd: Open file descriptor, or None if we're processing the top-level
        #   directory given to rmtree() and the user didn't supply dir_fd.
        # * path: Path of file to operate upon. This is passed to onexc() if an
        #   error occurs.
        # * orig_entry: os.DirEntry, or None if we're processing the top-level
        #   directory given to rmtree(). We used the cached stat() of the entry to
        #   save a call to os.lstat() when walking subdirectories.
        func, dirfd, path, orig_entry = stack.pop()
        name = path if orig_entry is None else orig_entry.name
        try:
            if func is os.close:
                os.close(dirfd)
                return
            if func is os.rmdir:
                os.rmdir(name, dir_fd=dirfd)
                return

            # Note: To guard against symlink races, we use the standard
            # lstat()/open()/fstat() trick.
            assert func is os.lstat
            if orig_entry is None:
                orig_st = os.lstat(name, dir_fd=dirfd)
            else:
                orig_st = orig_entry.stat(follow_symlinks=False)

            func = os.open  # For error reporting.
            topfd = os.open(name, os.O_RDONLY | os.O_NONBLOCK, dir_fd=dirfd)

            func = os.path.islink  # For error reporting.
            try:
                if not os.path.samestat(orig_st, os.fstat(topfd)):
                    # Symlinks to directories are forbidden, see GH-46010.
                    raise OSError("Cannot call rmtree on a symbolic link")
                stack.append((os.rmdir, dirfd, path, orig_entry))
            finally:
                stack.append((os.close, topfd, path, orig_entry))

            func = os.scandir  # For error reporting.
            with os.scandir(topfd) as scandir_it:
                entries = list(scandir_it)
            for entry in entries:
                fullname = os.path.join(path, entry.name)
                try:
                    if entry.is_dir(follow_symlinks=False):
                        # Traverse into sub-directory.
                        stack.append((os.lstat, topfd, fullname, entry))
                        continue
                except OSError:
                    pass
                try:
                    os.unlink(entry.name, dir_fd=topfd)
                except OSError as err:
                    onexc(os.unlink, fullname, err)
        except OSError as err:
            err.filename = path
            onexc(func, path, err)

    def _rmtree_unsafe(path, onexc):
        def onerror(err):
            onexc(os.scandir, err.filename, err)

        results = os.walk(path, topdown=False, onerror=onerror, followlinks=hasattr(os, "_walk_symlinks_as_files"))
        for dirpath, dirnames, filenames in results:
            for name in dirnames:
                fullname = os.path.join(dirpath, name)
                try:
                    os.rmdir(fullname)
                except OSError as err:
                    onexc(os.rmdir, fullname, err)
            for name in filenames:
                fullname = os.path.join(dirpath, name)
                try:
                    os.unlink(fullname)
                except OSError as err:
                    onexc(os.unlink, fullname, err)
        try:
            os.rmdir(path)
        except OSError as err:
            onexc(os.rmdir, path, err)

    if ignore_errors:

        def onexc(*args):
            pass

    elif onerror is None and onexc is None:

        def onexc(*args):
            raise

    elif onexc is None:
        if onerror is None:

            def onexc(*args):
                raise

        else:
            # delegate to onerror
            def onexc(*args):
                func, path, exc = args
                if exc is None:
                    exc_info = None, None, None
                else:
                    exc_info = type(exc), exc, exc.__traceback__
                return onerror(func, path, exc_info)

    if _use_fd_functions:
        # While the unsafe rmtree works fine on bytes, the fd based does not.
        if isinstance(path, bytes):
            path = os.fsdecode(path)
        stack = [(os.lstat, dir_fd, path, None)]
        try:
            while stack:
                _rmtree_safe_fd(stack, onexc)
        finally:
            # Close any file descriptors still on the stack.
            while stack:
                func, fd, path, entry = stack.pop()
                if func is not os.close:
                    continue
                try:
                    os.close(fd)
                except OSError as err:
                    onexc(os.close, path, err)
    else:
        if dir_fd is not None:
            raise NotImplementedError("dir_fd unavailable on this platform")
        try:
            if _rmtree_islink(path):
                # symlinks to directories are forbidden, see bug #1669
                raise OSError("Cannot call rmtree on a symbolic link")
        except OSError as err:
            onexc(os.path.islink, path, err)
            # can't continue even if onexc hook returns
            return
        return _rmtree_unsafe(path, onexc)