pantri/scripts/lib/pantri.py (382 lines of code) (raw):
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
import fnmatch
import getpass
import json
import logging
import os
import platform
import re
import sys
import time
from typing import Any, Dict
# Third party modules
import dirsync
import git
# from dsi.logger.py.LoggerConfigHandler import LoggerConfigHandler
from cpe.pyexec.modules import api_tools
from . import manifold # noqa: F401
from . import config, utils
from .fb_objectstore import FB_ObjectStore
class Pantri(object):
"""
Main class for Pantri
"""
def __init__(self, cli_options=None):
"""
__init__(self):
Instantiate class variables
"""
if cli_options is None:
cli_options = {}
self.logger = logging.getLogger("pantri")
self.paths = utils.get_paths()
self.git_path = self.paths["repo_root"]
self.gitignore = self.read_gitignore()
self.item_data = []
# Get options from config
self.shelf = "default"
if "shelf" in cli_options:
self.shelf = cli_options["shelf"]
if "objects" in cli_options:
self.shelf = utils.get_shelf_directory(cli_options["objects"])
self.options = config.get_options(self.shelf, cli_options)
def __del__(self):
for data in self.item_data:
api_tools.log_to_logger("CPEPantriRunsLoggerConfig", data)
self.logger.info("Finished")
def get_objects_to_upload(self, objects):
"""
get_objects_to_upload(self, objects)
Given a list of object paths, return a dictionary containing meatdata of
file or files withi a directory.
"""
expanded_objects = []
objects_metadata = {}
# Loop though objects to build list of all files to upload
for obj in objects:
# Use objects full path when building list of objects
obj = utils.unix_path(os.path.abspath(obj))
# Only upload objects within "shelves" directory
if not obj.startswith((self.git_path)):
self.logger.error(
"Object %s is not within %s " % (obj, self.paths["shelves"])
)
# TODO create exit functions and update all call sites
sys.exit(1)
# TODO create function to return list of files.
# Build list of objects
if os.path.isfile(obj):
expanded_objects.append(obj)
elif os.path.isdir(obj):
for (root, _dirs, files) in os.walk(obj):
for f in files:
obj = os.path.join(root, f)
expanded_objects.append(obj)
else:
self.logger.warn("Local file '%s' not found" % obj)
# Process list of object to calcuate file size, modified time and hash
objects_metadata = self.process_objects(expanded_objects)
return objects_metadata
def process_objects(self, expanded_objects=None):
"""
process_objects(expanded_objects)
Given a list of objects, determines if uploadable (binary), and
then create a dictionary of:
sha1_hash
sha256_hash
modified_time
filesize
Sha1_hash is only determined on first upload or if modified time and
file size changed.
"""
if expanded_objects is None:
expanded_objects = []
objects_metadata = {}
for obj in expanded_objects:
# Process if object is uploadable
if self.uploadable_object(obj):
# Object name in metadata file. Replace \\ with / to remain consistent
# accoss platforms
object_name = utils.unix_path(
os.path.relpath(obj, self.paths["shelves"])
)
# Determine paths
object_path = os.path.abspath(obj)
object_metadata_file = "%s.pitem" % object_path
# Add object to gitignore
self.add_object_to_gitignore(obj)
object_mtime = utils.get_modified_time(obj)
object_file_size = utils.get_file_size(obj)
# Use cached checksum since checksum hashing is cpu intensive and
# file size and modified times are quicker. Checksums are force using
# cli flag --checksum.
if not self.options["checksum"] and os.path.exists(
object_metadata_file
):
with open(object_metadata_file) as json_file:
cached_metadata = json.load(json_file)
# Use cached hash if filesize and mtime are the same
if (
object_file_size == cached_metadata[object_name]["file_size"]
and object_mtime
== cached_metadata[object_name]["modified_time"]
):
object_sha1_hash = cached_metadata[object_name]["sha1_hash"]
if "sha26_hash" in cached_metadata[object_name]:
cryp = "sha256_hash"
obj = object_name
object_sha256_hash = cached_metadata[obj][cryp]
else:
object_sha256_hash = utils.get_sha256(obj)
else:
object_sha1_hash = utils.get_sha1(obj)
object_sha256_hash = utils.get_sha256(obj)
else:
# Genertate hash if cached_metadat is not present
object_sha1_hash = utils.get_sha1(obj)
object_sha256_hash = utils.get_sha256(obj)
# Add object to metadata dictionary
objects_metadata[object_name] = {
"sha1_hash": object_sha1_hash,
"sha256_hash": object_sha256_hash,
"modified_time": object_mtime,
"file_size": object_file_size,
}
return objects_metadata
def get_uploaded_objects(self):
"""
get_uploaded_objects(self)
Walk though git repo and build one giant dictionary of uploaded objects
"""
uploaded_objects = {}
# Default sync is all directories under shelves unless specified
shelf = self.paths["shelves"]
if "shelf" in self.options:
shelf = os.path.join(shelf, self.options["shelf"])
# TODO create function to get list of files
# Loop though all files to determine which files where uploaded.
for (root, _dirs, files) in os.walk(shelf):
for f in files:
obj = os.path.join(root, f)
filename = os.path.basename(obj)
# Only care about *.pitem files
if re.match("^.*.pitem$", filename):
with open(obj) as json_file:
uploaded_objects.update(json.load(json_file))
return uploaded_objects
def get_objects_on_disk(self):
"""
get_objects_on_disk(self)
Walk though local storage and build one giant dictionary of objects on disk
"""
objects_on_disk = {}
download_path = self.options["dest_sync"]
if "shelf" in self.options:
download_path = os.path.join(download_path, self.options["shelf"])
for (root, _dirs, files) in os.walk(download_path):
for f in files:
obj = os.path.join(root, f)
object_name = utils.unix_path(
os.path.relpath(obj, self.options["dest_sync"])
)
# Return sha1 hash if checksum is enabled
if self.options["checksum"]:
objects_on_disk.update(
{object_name: {"sha1_hash": utils.get_sha1(obj)}}
)
else:
objects_on_disk.update(
{
object_name: {
"modified_time": utils.get_modified_time(obj),
"file_size": utils.get_file_size(obj),
}
}
)
return objects_on_disk
def sync_local_files(self):
"""
Sync non-binary files in source to dest_path
"""
# Define source and destination paths
src_path = self.paths["shelves"]
dest_path = self.options["dest_sync"]
if "shelf" in self.options:
src_path = os.path.join(src_path, self.options["shelf"])
dest_path = os.path.join(dest_path, self.options["shelf"])
# Sync if paths dont match. No need to sync on top of the same path.
if not src_path == dest_path:
dirsync.sync(
src_path,
dest_path,
"sync",
logger=self.logger,
create=True,
verbose=True,
exclude=["^.*.pitem$"],
)
def get_objects_to_retrieve(self):
if "pitem" in self.options:
if os.path.exists(self.options["pitem"]):
try:
with open(self.options["pitem"]) as json_file:
item = json.load(json_file)
uploaded_objects = item
dl_path = os.path.join(self.paths["repo_root"], "shelves")
self.options["dest_sync"] = dl_path
except Exception:
self.logger.error("Not a .pitem")
raise Exception("Did not specify a proper .pitem during retrieval")
else:
self.logger.error("Path to retrieval item does not exist.")
raise OSError("Retrieval path doesn't exist.")
else:
uploaded_objects = self.get_uploaded_objects()
objects_on_disk = self.get_objects_on_disk()
# Compare upload object and objects on disk. Download missing files
objects_to_retrieve = {}
for obj in uploaded_objects:
if obj not in objects_on_disk:
self.logger.info("Download Object: %s (Object not present)" % obj)
# Build dictionary of objects to download
objects_to_retrieve.update({obj: uploaded_objects[obj]})
continue
# Compare sha1 hashes if checksum is enabled
if (
self.options["checksum"]
and objects_on_disk[obj]["sha1_hash"]
== uploaded_objects[obj]["sha1_hash"]
):
self.logger.info("Skip Object: %s (matching hash)" % obj)
continue
# Check file size and modified times
if (
objects_on_disk[obj]["file_size"] == uploaded_objects[obj]["file_size"]
and objects_on_disk[obj]["modified_time"]
== uploaded_objects[obj]["modified_time"]
):
self.logger.info(
"Skip Object: %s (matching file size and modified time.)" % obj
)
continue
# Build dictionary of objects to download
self.logger.info("Download Object: %s (object different)" % obj)
objects_to_retrieve.update({obj: uploaded_objects[obj]})
return objects_to_retrieve
def get_objects_to_delete(self):
"""
get_objects_to_delete()
Return a list of files removed from git repo between pulls
"""
# changed_files returns a tulple of (added, modified, deleted)
deleted_files = utils.changed_files()[2]
objects_to_delete = []
for obj in deleted_files:
# Delete files within shelves dir only.
if not obj.startswith("shelves"):
continue
# Git returns the relative path of updated objects from git root.
# Build object full path using repo_root path
obj_path = os.path.join(self.paths["repo_root"], obj)
# Determine object path within "shelves" directory
obj_shelves_path = os.path.relpath(obj_path, self.paths["shelves"])
# Build sync path
obj_dest_path = os.path.join(self.paths["dest_sync"], obj_shelves_path)
# Remove pitem from deletes to actually delete the binary file.
if obj_dest_path.endswith("pitem"):
obj_dest_path = os.path.splitext(obj_dest_path)[0]
# Append objects to delete
objects_to_delete.append(obj_dest_path)
return objects_to_delete
def nothing_to_retrieve(self):
# Verify git config options are set and the auth token exists
self.configure()
try:
git_pull = git.cmd.Git(self.paths["repo_root"]).pull()
except git.exc.GitCommandError as e:
self.logger.error(
"Git pull failed. Please be on master"
f" or set your upstream to master: {e}."
)
raise
# perform a retrieve if the force flag is passed or retrieve a single pitem
if "force" in self.options or "pitem" in self.options:
return False
if re.match("^Already up-to-date.$", git_pull):
return True
return False
def retrieve(self):
"""
retrieve(self)
Compares files uploaded(json files in git) to objects already on disk
to determine which files to download. File on disk that are not in git are
removed.
"""
self.logger.info("Beginning to retrieve files.")
objects_to_delete = []
if "pitem" not in self.options:
# Sync non-binary files in shelves to dest_sync
self.sync_local_files()
# Delete objects in dest_sync on disk
objects_to_delete = self.get_objects_to_delete()
if objects_to_delete:
utils.remove(objects_to_delete)
# Download objects
objects_to_retrieve = self.get_objects_to_retrieve()
verify_retrieve = []
if objects_to_retrieve:
module = self.use_objectstore(self.options["object_store"])
with module(self.options) as objectstore:
for obj in objectstore.retrieve(objects_to_retrieve):
verify_retrieve.append(obj)
else:
self.logger.info("Repo up-to-date. No files to retrieve.")
self.build_itemdata(objects_to_retrieve, verify_retrieve)
self.write_updated_objects_to_file(objects_to_retrieve, objects_to_delete)
def write_updated_objects_to_file(self, objects_retrieved, objects_removed):
# Default json
updated_objects = {"retrieved": [], "removed": []}
# object_retrieve is a dict. Loop though and build full path and append
for obj in objects_retrieved:
obj_abs_path = os.path.join(self.paths["dest_sync"], obj)
updated_objects["retrieved"].append(obj_abs_path)
# objects_removed is an array. Extend array
updated_objects["removed"].extend(objects_removed)
# Write updated_objects file to disk
filename = "%s_updated_objects.json" % self.shelf
if self.shelf == "default":
filename = "all_updated_objects.json"
updated_objects_path = os.path.join(self.paths["scripts"], filename)
utils.write_json_file(updated_objects_path, updated_objects)
def configure(self):
"""Verifies local git settings and auth token exists"""
# Verify ignorecase config is set for it-bin repo
git_config = utils.read_file(self.paths["git_config"])
if not re.search("ignorecase\s=\strue", git_config):
cwd = os.path.join(utils.get_paths()["repo_root"], ".git")
utils.run(["git", "config", "--local", "core.ignorecase", "true"], cwd)
# Verify auth token is present.
FB_ObjectStore(self.options).get_auth_token()
def store(self):
# Verify git config options are set and the auth token exists
self.configure()
self.logger.info("Beginning to upload files.")
objects = self.options["objects"]
# Generate list of objects to store(upload)
objects_to_upload = self.get_objects_to_upload(objects)
# TODO consider when this should be written to disk.
# Update which files to exclude.
self.write_gitignore()
module = self.use_objectstore(self.options["object_store"])
verify_upload = []
with module(self.options) as objectstore:
for obj in objectstore.upload(objects_to_upload):
if obj in objects_to_upload:
verify_upload.append(obj)
self.write_diff_file({obj: objects_to_upload[obj]})
self.build_itemdata(objects_to_upload, verify_upload)
if len(verify_upload) != len(objects_to_upload):
unsuccessful_obj = objects_to_upload.keys() - verify_upload
self.logger.error(
"Unsuccessful uploads: \n" + ("\n".join(map(str, unsuccessful_obj)))
)
sys.exit(-1)
def uploadable_object(self, obj):
"""
uploadable_obect(obj)
Given an object, deterine if an object should be uploaded to object store.
Uploadable object is defined as a binary that doesnt "ignore_patterns"
listed in config.
"""
# Exclude generated files.
filename = os.path.basename(obj)
if re.match("^.*.pitem$", filename):
return False
# Exclude files that match patten defined in config. ie, "*.pyc"
for pattern in self.options["ignore_patterns"]:
if fnmatch.fnmatch(filename, pattern):
return False
# Binary overrides match patten defined in config. ie, "*.pyc"
for pattern in self.options["binary_overrides"]:
if fnmatch.fnmatch(filename, pattern):
return True
# Binary check
object_path = os.path.abspath(obj)
if utils.is_binary(object_path):
return True
return False
def add_object_to_gitignore(self, obj):
"""
Determine extension or full path to add to gitignore
"""
# TODO making a lot of assumptions when making this list. Need to improve
# Build list of upload-able items to add to gitignore.
rel_obj_path = os.path.relpath(obj, self.git_path)
filename, file_ext = os.path.splitext(rel_obj_path)
# Add extension or full path to gitignore
if file_ext and re.match("^.[a-z]+$", file_ext, re.IGNORECASE):
ext = "*" + file_ext.lower()
if ext not in self.gitignore:
self.gitignore.append(ext)
else:
if rel_obj_path not in self.gitignore:
self.gitignore.append(rel_obj_path)
def write_diff_file(self, object_metadata):
"""
write_diff_file(self, object_metadata)
Write json file contain metadata about a object.
"""
# Write diff file to git repo
path, filename = os.path.split(
os.path.join(self.paths["shelves"], list(object_metadata.keys())[0])
)
diff_file = "%s/%s.pitem" % (path, filename)
utils.write_json_file(diff_file, object_metadata)
def read_gitignore(self):
"""
Read contents of .gitignore
"""
gitignore_path = self.paths["git_ignore"]
gitignore = utils.read_file(gitignore_path).rstrip().split("\n")
return gitignore
def write_gitignore(self):
"""
Exclude binary files uploaded to object store.
"""
contents = ""
# Loop though list, sort, and remove deplicates
for item in sorted(set(self.gitignore)):
contents += item + "\n"
for path in [self.paths["git_ignore"], self.paths["git_exclude"]]:
utils.write_file(path, contents)
def use_objectstore(self, name):
components = name.split(".")
mod = __import__(components[0])
for comp in components[1:]:
try:
mod = getattr(mod, comp)
except BaseException:
self.logger.error("Incorrect objectstore: %s", name)
raise BaseException("Not a valid objectstore")
self.logger.info("Using %s object store", mod)
return mod
def build_itemdata(self, total_objects, successful_objects):
"""
Build the payload data for every item
"""
for item in total_objects:
success = 0
if item in successful_objects:
success = 1
shelf = item.split(os.sep)[0]
mod_time = total_objects[item]["modified_time"]
data = self.build_payload(
self.options["method"],
item,
str(self.options["object_store"]),
shelf,
success,
str(total_objects[item]["sha1_hash"]),
time.strftime("%Y-%m-%d %H:%M:%S %z", time.gmtime(mod_time)),
total_objects[item]["file_size"],
log_url=None,
arguments=None,
)
self.item_data.append(data)
def build_payload(
self,
command,
object_name,
object_store,
shelf,
return_code,
checksum,
date_modified,
filesize,
log_url,
arguments,
):
"""
Build a dictionary representing the item data for logging
"""
json_dict = {} # type: Dict[str, Any]
json_dict["unixname"] = getpass.getuser()
json_dict["command"] = command
json_dict["object_name"] = object_name
if "FB" in object_store:
json_dict["object_store"] = "Swift"
else:
json_dict["object_store"] = "Manifold"
json_dict["shelf"] = shelf
json_dict["return_code"] = return_code
json_dict["checksum"] = checksum
json_dict["date_modified"] = date_modified
json_dict["filesize"] = filesize
json_dict["verbose_log_url"] = log_url
json_dict["arguments"] = arguments
json_dict["host_name"] = platform.node()
return json_dict