s3_management/manage.py (230 lines of code) (raw):
#!/usr/bin/env python
import argparse
import tempfile
import time
from os import path, makedirs
from collections import defaultdict
from typing import Iterator, List, Type, Dict, Set, TypeVar, Optional
from re import sub, match
from packaging.version import parse
import botocore
import boto3
S3 = boto3.resource('s3')
CLIENT = boto3.client('s3')
BUCKET = S3.Bucket('pytorch')
ACCEPTED_FILE_EXTENSIONS = ("whl", "zip")
ACCEPTED_SUBDIR_PATTERNS = [
r"cu[0-9]+", # for cuda
r"rocm[0-9]+\.[0-9]+", # for rocm
"cpu",
]
PREFIXES_WITH_HTML = {
"whl": "torch_stable.html",
"whl/lts/1.8": "torch_lts.html",
"whl/nightly": "torch_nightly.html",
"whl/test": "torch_test.html",
}
# How many packages should we keep of a specific package?
KEEP_THRESHOLD = 60
S3IndexType = TypeVar('S3IndexType', bound='S3Index')
class S3Index:
def __init__(self: S3IndexType, objects: List[str], prefix: str) -> None:
self.objects = objects
self.prefix = prefix.rstrip("/")
self.html_name = PREFIXES_WITH_HTML[self.prefix]
# should dynamically grab subdirectories like whl/test/cu101
# so we don't need to add them manually anymore
self.subdirs = {
path.dirname(obj) for obj in objects if path.dirname != prefix
}
def nightly_packages_to_show(self: S3IndexType) -> Set[str]:
"""Finding packages to show based on a threshold we specify
Basically takes our S3 packages, normalizes the version for easier
comparisons, then iterates over normalized versions until we reach a
threshold and then starts adding package to delete after that threshold
has been reached
After figuring out what versions we'd like to hide we iterate over
our original object list again and pick out the full paths to the
packages that are included in the list of versions to delete
"""
# also includes versions without GPU specifier (i.e. cu102) for easier
# sorting, sorts in reverse to put the most recent versions first
all_sorted_packages = sorted(
{self.normalize_package_version(obj) for obj in self.objects},
key=lambda name_ver: parse(name_ver.split('-', 1)[-1]),
reverse=True,
)
packages: Dict[str, int] = defaultdict(int)
to_hide: Set[str] = set()
for obj in all_sorted_packages:
package_name = path.basename(obj).split('-')[0]
if packages[package_name] >= KEEP_THRESHOLD:
to_hide.add(obj)
else:
packages[package_name] += 1
return set(self.objects).difference({
obj for obj in self.objects
if self.normalize_package_version(obj) in to_hide
})
def is_obj_at_root(self, obj:str) -> bool:
return path.dirname(obj) == self.prefix
def _resolve_subdir(self, subdir: Optional[str] = None) -> str:
if not subdir:
subdir = self.prefix
# make sure we strip any trailing slashes
return subdir.rstrip("/")
def gen_file_list(
self,
subdir: Optional[str]=None,
package_name: Optional[str] = None
) -> Iterator[str]:
objects = (
self.nightly_packages_to_show() if self.prefix == 'whl/nightly'
else self.objects
)
subdir = self._resolve_subdir(subdir) + '/'
for obj in objects:
if package_name is not None:
if self.obj_to_package_name(obj) != package_name:
continue
if self.is_obj_at_root(obj) or obj.startswith(subdir):
yield obj
def get_package_names(self, subdir: Optional[str] = None) -> List[str]:
return sorted(set(self.obj_to_package_name(obj) for obj in self.gen_file_list(subdir)))
def normalize_package_version(self: S3IndexType, obj: str) -> str:
# removes the GPU specifier from the package name as well as
# unnecessary things like the file extension, architecture name, etc.
return sub(
r"%2B.*",
"",
"-".join(path.basename(obj).split("-")[:2])
)
def obj_to_package_name(self, obj: str) -> str:
return path.basename(obj).split('-', 1)[0]
def to_legacy_html(
self,
subdir: Optional[str]=None
) -> str:
"""Generates a string that can be used as the HTML index
Takes our objects and transforms them into HTML that have historically
been used by pip for installing pytorch.
NOTE: These are not PEP 503 compliant but are here for legacy purposes
"""
out: List[str] = []
subdir = self._resolve_subdir(subdir)
is_root = subdir == self.prefix
for obj in self.gen_file_list(subdir):
# Strip our prefix
sanitized_obj = obj.replace(subdir, "", 1)
if sanitized_obj.startswith('/'):
sanitized_obj = sanitized_obj.lstrip("/")
# we include objects at our root prefix so that users can still
# install packages like torchaudio / torchtext even if they want
# to install a specific GPU arch of torch / torchvision
if not is_root and self.is_obj_at_root(obj):
# strip root prefix
sanitized_obj = obj.replace(self.prefix, "", 1).lstrip("/")
sanitized_obj = f"../{sanitized_obj}"
out.append(f'<a href="{sanitized_obj}">{sanitized_obj}</a><br/>')
return "\n".join(sorted(out))
def to_simple_package_html(
self,
subdir: Optional[str],
package_name: str
) -> str:
"""Generates a string that can be used as the package simple HTML index
"""
out: List[str] = []
# Adding html header
out.append('<!DOCTYPE html>')
out.append('<html>')
out.append(' <body>')
out.append(' <h1>Links for {}</h1>'.format(package_name))
for obj in sorted(self.gen_file_list(subdir, package_name)):
out.append(f' <a href="/{obj}">{path.basename(obj).replace("%2B","+")}</a><br/>')
# Adding html footer
out.append(' </body>')
out.append('</html>')
out.append('<!--TIMESTAMP {}-->'.format(int(time.time())))
return '\n'.join(out)
def to_simple_packages_html(
self,
subdir: Optional[str],
) -> str:
"""Generates a string that can be used as the simple HTML index
"""
out: List[str] = []
# Adding html header
out.append('<!DOCTYPE html>')
out.append('<html>')
out.append(' <body>')
for pkg_name in sorted(self.get_package_names(subdir)):
out.append(f' <a href="{pkg_name}/">{pkg_name}</a><br/>')
# Adding html footer
out.append(' </body>')
out.append('</html>')
out.append('<!--TIMESTAMP {}-->'.format(int(time.time())))
return '\n'.join(out)
def upload_legacy_html(self) -> None:
for subdir in self.subdirs:
print(f"INFO Uploading {subdir}/{self.html_name}")
BUCKET.Object(
key=f"{subdir}/{self.html_name}"
).put(
ACL='public-read',
CacheControl='no-cache,no-store,must-revalidate',
ContentType='text/html',
Body=self.to_legacy_html(subdir=subdir)
)
def upload_pep503_htmls(self) -> None:
for subdir in self.subdirs:
print(f"INFO Uploading {subdir}/index.html")
BUCKET.Object(
key=f"{subdir}/index.html"
).put(
ACL='public-read',
CacheControl='no-cache,no-store,must-revalidate',
ContentType='text/html',
Body=self.to_simple_packages_html(subdir=subdir)
)
for pkg_name in self.get_package_names(subdir=subdir):
print(f"INFO Uploading {subdir}/{pkg_name}/index.html")
BUCKET.Object(
key=f"{subdir}/{pkg_name}/index.html"
).put(
ACL='public-read',
CacheControl='no-cache,no-store,must-revalidate',
ContentType='text/html',
Body=self.to_simple_package_html(subdir=subdir, package_name=pkg_name)
)
def save_legacy_html(self) -> None:
for subdir in self.subdirs:
print(f"INFO Saving {subdir}/{self.html_name}")
makedirs(subdir, exist_ok=True)
with open(path.join(subdir, self.html_name), mode="w", encoding="utf-8") as f:
f.write(self.to_legacy_html(subdir=subdir))
def save_pep503_htmls(self) -> None:
for subdir in self.subdirs:
print(f"INFO Saving {subdir}/index.html")
makedirs(subdir, exist_ok=True)
with open(path.join(subdir, "index.html"), mode="w", encoding="utf-8") as f:
f.write(self.to_simple_packages_html(subdir=subdir))
for pkg_name in self.get_package_names(subdir=subdir):
makedirs(path.join(subdir, pkg_name), exist_ok=True)
with open(path.join(subdir, pkg_name, "index.html"), mode="w", encoding="utf-8") as f:
f.write(self.to_simple_package_html(subdir=subdir, package_name=pkg_name))
@classmethod
def from_S3(cls: Type[S3IndexType], prefix: str) -> S3IndexType:
objects = []
prefix = prefix.rstrip("/")
for obj in BUCKET.objects.filter(Prefix=prefix):
is_acceptable = any([path.dirname(obj.key) == prefix] + [
match(
f"{prefix}/{pattern}",
path.dirname(obj.key)
)
for pattern in ACCEPTED_SUBDIR_PATTERNS
]) and obj.key.endswith(ACCEPTED_FILE_EXTENSIONS)
if is_acceptable:
sanitized_key = obj.key.replace("+", "%2B")
objects.append(sanitized_key)
return cls(objects, prefix)
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser("Manage S3 HTML indices for PyTorch")
parser.add_argument(
"prefix",
type=str,
choices=list(PREFIXES_WITH_HTML.keys()) + ["all"]
)
parser.add_argument("--do-not-upload", action="store_true")
parser.add_argument("--generate-pep503", action="store_true")
return parser
def main():
parser = create_parser()
args = parser.parse_args()
action = "Saving" if args.do_not_upload else "Uploading"
if args.prefix == 'all':
for prefix in PREFIXES_WITH_HTML.keys():
print(f"INFO: {action} indices for '{prefix}'")
idx = S3Index.from_S3(prefix=prefix)
if args.do_not_upload:
idx.save_legacy_html()
else:
idx.upload_legacy_html()
else:
print(f"INFO: {action} indices for '{args.prefix}'")
idx = S3Index.from_S3(prefix=args.prefix)
if args.do_not_upload:
idx.save_legacy_html()
if args.generate_pep503:
idx.save_pep503_htmls()
else:
idx.upload_legacy_html()
if args.generate_pep503:
idx.upload_pep503_htmls()
if __name__ == "__main__":
main()