pantri/scripts/lib/utils.py (255 lines of code) (raw):
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
# Utility functions.
import getpass
import glob
import hashlib
import json
import logging
import os
import platform
import re
import shutil
import subprocess
# Third party modules
import git
def run(cmd, cwd=None, sanitize=True):
"""
Quick clone of shell_tools.run.
TODO: Improve error handling
"""
# TODO Determine if there is a better way to do this.
# Setting cwd within the repo
if not cwd:
cwd = os.path.dirname(get_paths()["repo_root"])
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=cwd, universal_newlines=True)
stdout, stderr = p.communicate()
status_code = p.wait()
result_dict = {
"stdout": sanitize_output(stdout) if sanitize else stdout,
"stderr": sanitize_output(stderr) if sanitize else stderr,
"status": status_code,
"success": True if status_code == 0 else False,
}
return result_dict
def sanitize_output(text):
# Return a stripped string without newlines
if text:
return text.strip().replace("\n", "").replace("\r", "")
def read_file(filename):
"""
read_file(filename)
Reads content of a file or returns "" if file doesn't exists
"""
if not os.path.exists(filename):
return ""
with open(filename) as myfile:
return myfile.read()
def create_parent_directory_if_necessary(filename):
if not os.path.exists(filename):
file_path = os.path.splitdrive(filename)[-1]
folders = os.path.split(file_path)[0]
try:
os.makedirs(folders)
except Exception:
pass
def write_file(filename, content=None, mode="w"):
"""
write_file(filename, content=None, mode="w")
Creates a file including parent directories, default mode mode is 'w'
"""
create_parent_directory_if_necessary(filename)
with open(filename, mode) as myfile:
myfile.write(str(content))
def write_json_file(filename, data, mode="w", indent=2, sort_keys=True):
"""
write_json_file(filename, data, mode='w', indent=2, sort_keys=True)
Writes a json object to a file, creating parent directories if necessary.
"""
assert isinstance(data, dict), "Json object type must be a dict!"
create_parent_directory_if_necessary(filename)
with open(filename, mode) as json_file:
json.dump(data, json_file, indent=indent, sort_keys=sort_keys)
def get_sha1(file_path):
"""
get_sha1(file_path)
Returns the sha1 of the file_path
copied from fs_tools
"""
if os.path.exists(file_path):
hash = hashlib.sha1()
with open(file_path, "rb") as f:
chunk = 0
while chunk != b"":
chunk = f.read(4096)
hash.update(chunk)
return hash.hexdigest()
else:
return None
def get_sha256(filename, block_size=65536):
"""
get_sha256(file_path)
Returns the sha256 the filename
"""
if not os.path.exists(filename):
return None
sha256 = hashlib.sha256()
with open(filename, "rb") as f:
for block in iter(lambda: f.read(block_size), b""):
sha256.update(block)
return sha256.hexdigest()
def get_username():
"""
get_username()
Return Username
"""
os_platform = platform.system()
if os_platform == "Darwin":
cmd = ["/usr/bin/stat", "-f%Su", "/dev/console"]
default_username = run(cmd)["stdout"]
else:
default_username = getpass.getuser()
username = input("Username [%s]: " % default_username)
if not username:
username = default_username
return username
def get_user_home_dir() -> str:
"""
Grabs the home directory of the user's system
We store our 'pantri_config.json' file here
"""
standard_home = os.path.expanduser("~")
if os.path.exists(standard_home):
return standard_home
logging.getLogger("pantri").error(
"Error: home path not found. Try running Pantri again."
)
exit()
def get_paths():
""" Return dict of paths based on it-bin git repo path """
repo = git.cmd.Git(get_itbin_dir())
repo_root = repo.rev_parse(show_toplevel=True)
return {
"repo_root": repo_root,
"scripts": os.path.join(repo_root, "scripts"),
"logs": os.path.join(repo_root, "scripts", "logs"),
"shelves": os.path.join(repo_root, "shelves"),
"dest_sync": os.path.join(repo_root, "dest_sync"),
"git_config": os.path.join(repo_root, ".git", "config"),
"git_exclude": os.path.join(repo_root, ".git", "info", "exclude"),
"git_ignore": os.path.join(repo_root, ".gitignore"),
"auth_token": os.path.join(repo_root, ".pantri_auth_token"),
}
def get_itbin_dir() -> str:
"""
Checks to see if 'pantri_config.json' exists
- Yes -> check it-bin path inside config if valid
- No -> ask for it-bin path and create conf for future use
Order of precedence:
CWD -> config/flag -> prompt
"""
# CWD
if verify_git_repo(os.getcwd()):
return os.getcwd()
# config file
home = get_user_home_dir()
config_path = os.path.join(home, ".pantri_config.json")
if os.path.exists(config_path):
with open(config_path, "r") as f:
config = json.load(f)
if verify_git_repo(config["itbin_path"]):
return config["itbin_path"]
# prompt
it_bin_dir = prompt_for_itbin_dir()
if verify_git_repo(it_bin_dir):
create_conf(it_bin_dir)
return it_bin_dir
logging.getLogger("pantri").error("Error: not a Git repository")
raise git.exc.GitError("Path is not a git repository.")
def prompt_for_itbin_dir() -> str:
"""
Ask for the users it-bin path on their devserver
Verifies path, if valid then create config file
"""
repo = input("Please enter your it-bin filepath: ") or ""
repo = repo.strip()
path = os.path.expanduser(repo)
if not os.path.exists(path):
logging.getLogger("pantri").error("Error: path not found")
exit()
return path
def create_conf(path):
"""
Create configuration file
store it-bin path so can be used on later Pantri uses
"""
home = get_user_home_dir()
path_to_config = os.path.join(home, ".pantri_config.json")
git_root = get_top_level_git(path)
config = {"itbin_path": str(git_root)}
with open(path_to_config, "w") as f:
json.dump(config, f)
def get_top_level_git(path):
"""
Grab the top level git path to store in our config file
Used incase user types in something like it-bin/scripts, still returns it-bin
so we have correct paths. (it-bin/dest-sync instead of it-bin/scripts/dest-sync)
"""
git_root = path
try:
git_repo = git.Repo(path, search_parent_directories=True)
git_root = git_repo.git.rev_parse("--show-toplevel")
except git.exc.GitError:
logging.getLogger("pantri").error("Path is not in a git repository")
raise git.exc.GitError("Path is not a git repository.")
return git_root
def get_shelf_directory(object_path):
"""
Return the shelves directory of objects being uploaded. ie chef or mdt_images
shelf_dir is used to determine non-default settings for uploading/syncing
"""
shelves = get_paths()["shelves"]
# Get relative path of object to "shelves" dir
shelf_dir = os.path.relpath(object_path[0], shelves)
# Split on os separator and return top directory which will be shelf name
return shelf_dir.split(os.sep)[0]
def verify_git_repo(repo_path):
""" Verify script ran within the it-bin git repo"""
if not os.path.exists(repo_path):
logging.getLogger("pantri").error("Error: path cannot be found.")
exit()
# Check to see if it is a git repository
try:
git_remote = git.cmd.Git(repo_path).remote(verbose=True)
except git.exc.GitError:
return False
# If git repository, make sure it is it-bin git repository
if re.search("/it-bin", git_remote):
return True
return False
def get_git_commits():
"""
get_git_commits()
Return the pervious and current from log/refs/head/master
"""
# Grab commits from last entry in log/refs/head/master
repo_path = get_paths()["repo_root"]
commits = str(git.Repo(repo_path).heads.master.log()[-1]).split()
# Return null commit id if no commits are in the refs log
if not commits:
return (
0000000000000000000000000000000000000000,
0000000000000000000000000000000000000000,
)
previous_commit_id = commits[0]
current_commit_id = commits[1]
return previous_commit_id, current_commit_id
def changed_files():
"""
changed_files()
Returns (added, modified, deleted) files between git pulls.
Code copied from: fbcode/scm/lib/gitrepo.py and changed to use GitPython
"""
added = []
modified = []
deleted = []
repo_path = get_paths()["repo_root"]
previous_commit_id, current_commit_id = get_git_commits()
# Commit id of all zeros indicates repo was just cloned, therefore don't dont
# need to check what files changed.
if previous_commit_id == "0000000000000000000000000000000000000000":
return (added, modified, deleted)
# Parse diff tree to determine which files where changed between git pulls
parts = (
git.Git(repo_path)
.diff_tree(
[
"--name-status",
"-z",
"--root",
"-m",
"-r",
previous_commit_id,
current_commit_id,
]
)
.split("\0")
)
# Loop though changes files and determine changed/added/deleted files.
# Logic copied from fbcode.
offset = 0
while offset < len(parts) - 1:
kind = parts[offset]
path = parts[offset + 1]
if len(kind) == 40:
# It's a merge commit and diff-tree prints the diff between
# both parents (separated by the commit hash). Just skip the
# hash return a list of all the files that have changed.
offset += 1
continue
offset += 2
if kind == "M" or kind == "T":
modified.append(path)
elif kind == "A":
added.append(path)
elif kind == "D":
deleted.append(path)
return (added, modified, deleted)
def remove(paths):
"""
remove(paths)
This will remove files/directories recursively. Supports "paths" being a
list of paths and wildcard in file names.
Remove does nothing if path does not exist.
"""
# recursively call remove if paths is a list
if isinstance(paths, list):
for file_path in paths:
remove(file_path)
return
# Using glob to support wildcard in filenames
for file_path in glob.glob(paths):
# Only attempt to remove path if it exists
if not os.path.exists(file_path):
continue
# Remove directory
if os.path.isdir(file_path):
try:
shutil.rmtree(file_path)
except Exception:
logging.getLogger("pantri").error("Error: %s not removed" % file_path)
continue
# Remove files.
if os.path.isfile(file_path):
try:
os.remove(file_path)
except Exception:
logging.getLogger("pantri").error("Error: %s not removed" % file_path)
def get_modified_time(file_path):
"""
get_modified_time(file_path)
Returns the the modified time (in seconds) of the file
"""
if os.path.exists(file_path):
return int(os.path.getmtime(file_path))
return None
def get_file_size(file_path):
"""
get_file(file_path)
Returns file size in bytes of the file
"""
if os.path.exists(file_path):
return int(os.path.getsize(file_path))
return None
def unix_path(path):
"""
unix_path(path)
Convert a path to use forward slash (/) instead of double backslash (\\).
Needed when running script on windows.
"""
return path.replace("\\", "/")
def is_binary(file_path):
"""
is_binary(file_path)
Uses 'file' system cmd to determine if a file is binary.
"""
bin_regex = re.compile(r"binary")
# Command paths per platform
if platform.system() == "Windows":
file_cmd = __win_find_file_exe()
else:
file_cmd = "/usr/bin/file"
file_output = run([file_cmd, "--mime-encoding", "-b", file_path])["stdout"]
if bin_regex.search(file_output) is not None:
return True
return False
def __win_find_file_exe():
"""
__win_find_file_exe()
Searches for the binary `file.exe` on Windows which is used to determine a
file's type.
"""
exe_path = "Git\\usr\\bin\\file.exe"
search_paths = [
os.path.join(os.getenv("ProgramFiles(x86)"), exe_path),
os.path.join(os.getenv("ProgramFiles"), exe_path),
os.path.join(os.getenv("ProgramW6432"), exe_path),
]
for path in search_paths:
if os.path.isfile(path):
return path
raise OSError("Git is not installed!")