core/lib/util.py (105 lines of code) (raw):
#!/usr/bin/env python3
"""
Copyright (c) 2017-present, Facebook, Inc.
All rights reserved.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
"""
import logging
import os
import re
import stat
import subprocess
from .error import OSCError
log = logging.getLogger(__name__)
def rm(filename, sudo=False):
"""
Remove a file on the disk, not us os.rm because we want to add timeout to
the command. It's possible that the os call got hang when the disk has
some problems
"""
cmd_args = []
if sudo:
cmd_args += ["sudo"]
cmd_args += ["/bin/rm", filename]
log.debug("Executing cmd: {}".format(str(cmd_args)))
proc = subprocess.Popen(cmd_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
try:
(stdout, stderr) = proc.communicate(timeout=20)
# return True if returncode is success (0)
return not proc.returncode
except subprocess.TimeoutExpired:
proc.kill()
raise OSCError("SHELL_TIMEOUT", {"cmd": " ".join(cmd_args)})
def sync_dir(dirname):
"""
Calling fsync on the directory. This is for synching
deleted files to storage devices to prevent trim stalls.
"""
dirfd = os.open(dirname, os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
def is_file_readable(filepath):
"""
Check if the file given is readable to the user we are currently running
at
"""
uid = os.getuid()
euid = os.geteuid()
gid = os.getgid()
egid = os.getegid()
# This is probably true most of the time, so just let os.access()
# handle it. Avoids potential bugs in the rest of this function.
if uid == euid and gid == egid:
return os.access(filepath, os.R_OK)
st = os.stat(filepath)
if st.st_uid == euid:
return st.st_mode & stat.S_IRUSR != 0
groups = os.getgroups()
if st.st_gid == egid or st.st_gid in groups:
return st.st_mode & stat.S_IRGRP != 0
return st.st_mode & stat.S_IROTH != 0
def disk_partition_free(path):
"""
For given file path, return the size of free space in bytes of the
underlying disk
@param path: Full path string for which the disk space we need to get
@type path: string
@return: free disk space in btyes
@rtype : int
"""
try:
vfs = os.statvfs(path)
return vfs.f_bavail * vfs.f_bsize
except Exception:
log.exception("Exception when trying to get disk free space: ")
raise OSCError("UNABLE_TO_GET_FREE_DISK_SPACE", {"path": path})
def disk_partition_size(path):
"""
For given file path, return the total size in bytes of the
underlying disk partition
@param path: Full path in the partition for which the size we need to get
@type path: string
@return: total size in btyes
@rtype : int
"""
try:
vfs = os.statvfs(path)
return vfs.f_blocks * vfs.f_bsize
except Exception:
log.exception("Exception when trying to get partition size: ")
raise OSCError("UNABLE_TO_GET_PARTITION_SIZE", {"path": path})
def readable_size(nbytes):
"""
Translate a number representing byte size into a human readable form
@param nbytes: number representing bytes
@type nbytes: int
@return: readable size
@rtype : string
"""
suffixes = ["B", "KB", "MB", "GB", "TB", "PB"]
if nbytes == 0:
return "0 B"
i = 0
while nbytes >= 1024 and i < len(suffixes) - 1:
nbytes /= 1024.0
i += 1
f = ("%.2f" % nbytes).rstrip("0").rstrip(".")
return "%s %s" % (f, suffixes[i])
class RangeChain(object):
"""
A memory efficient class for memorize all the points that we've filled
within a range containing consecutive natural values.
Knowing that the missing points are much less than the number of filling
points, we only store missing points and record a end point of the range
"""
def __init__(self):
self._stop = 0
self._gap = []
def extend(self, points):
last_point = self._stop
for current_point in points:
# If it's consecutive then we should just extend the stop point
if current_point != last_point + 1:
for gap_point in range(last_point + 1, current_point):
self._gap.append(gap_point)
self._stop = current_point
last_point = current_point
def fill(self, point):
if point in self._gap:
self._gap.remove(point)
else:
if point > self._stop:
raise Exception(
"Trying to fill a value {} "
"beyond current covering range".format(point)
)
else:
raise Exception(
"Trying to fill a value {} which already exists".format(point)
)
def missing_points(self):
return self._gap
def dirname_for_db(db_name):
try:
converted_chars = []
for char in db_name:
if re.match("[a-zA-Z0-9_]", char):
converted_chars.append(char)
else:
hex_base = re.sub("^0x", "", str(hex(ord(char))))
while len(hex_base) < 4:
hex_base = "0" + hex_base
new_char = "@{}".format(hex_base)
converted_chars.append(new_char)
return "".join(converted_chars)
except Exception:
log.exception("Unable to convert db name %s to a valid directory name", db_name)
raise