analytics/download_count_wheels.py (146 lines of code) (raw):
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import gzip
import multiprocessing
import os
import re
import urllib
from tqdm import tqdm
import botocore
import boto3
S3 = boto3.resource('s3')
CLIENT = boto3.client('s3')
BUCKET = S3.Bucket('pytorch')
class CacheEntry:
_size = None
def __init__(self, download_uri: str):
self.download_uri = download_uri
self.bytes_sent = 0
@property
def os_type(self) -> str:
os_type = "linux"
if "win" in self.download_uri:
os_type = "windows"
elif "macosx" in self.download_uri:
os_type = "macos"
return os_type
@property
def target_arch(self) -> str:
target_arch = "cpu"
result = re.search(r"cu[0-9]+", self.download_uri)
if result:
target_arch = result[0]
return target_arch
@property
def package_name(self) -> str:
filename_contents = os.path.basename(self.download_uri).split('-')
return filename_contents[0]
@property
def package_version(self) -> str:
if "dev" in self.download_uri:
results = re.search(
r"[0-9]+\.[0-9]+\.[0-9]+\.dev[0-9]+",
self.download_uri
)
else:
results = re.search(
r"[0-9]+\.[0-9]+\.[0-9]+", self.download_uri
)
if not results:
raise Exception("Wtf there's no version o.O")
return results[0]
@property
def size(self) -> int:
if self._size is None:
for key in BUCKET.objects.filter(
Prefix=self.download_uri.lstrip("/")
):
self._size = key.size
if self._size is None:
raise Exception(
f"No object found for prefix {self.download_uri}"
)
return self._size
@property
def downloads(self):
return self.bytes_sent // self.size
def parse_logs(log_directory: str) -> dict:
bytes_cache = dict()
for (dirpath, _, filenames) in os.walk(log_directory):
for filename in tqdm(filenames):
with gzip.open(os.path.join(dirpath, filename), 'r') as gf:
string = gf.read().decode("utf-8")
entries = []
entries += string.splitlines()[2:]
for entry in entries:
columns = entry.split('\t')
bytes_sent = int(columns[3])
download_uri = urllib.parse.unquote(
urllib.parse.unquote(columns[7])
)
status = columns[8]
if not all([
status.startswith("2"),
download_uri.endswith((".whl", ".zip"))
]):
continue
if not bytes_cache.get(download_uri):
bytes_cache[download_uri] = CacheEntry(download_uri)
bytes_cache[download_uri].bytes_sent += bytes_sent
return bytes_cache
def output_results(bytes_cache: dict) -> None:
os_results = defaultdict(int)
arch_results = defaultdict(int)
package_results = defaultdict(lambda: defaultdict(int))
for _, val in tqdm(bytes_cache.items()):
try:
os_results[val.os_type] += val.downloads
arch_results[val.target_arch] += val.downloads
package_results[val.package_name][val.package_version] += (
val.downloads
)
except Exception:
pass
print("=-=-= Results =-=-=")
print("=-=-= OS =-=-=")
total_os_num = sum(os_results.values())
for os_type, num in os_results.items():
print(
f"\t* {os_type}: {num} ({(num/total_os_num)*100:.2f}%)"
)
print("=-=-= ARCH =-=-=")
total_arch_num = sum(arch_results.values())
for arch_type, num in arch_results.items():
print(
f"\t* {arch_type}: {num} ({(num/total_arch_num) * 100:.2f}%)"
)
print("=-=-= By Package =-=-=")
for package_name, upper_val in package_results.items():
print(f"=-=-= {package_name} =-=-=")
total_package_num = sum(upper_val.values())
for package_version, num in upper_val.items():
print(
f"\t* {package_version}: {num} ({(num/total_package_num) * 100:.2f}%)"
)
def download_logs(log_directory: str, since: float):
dt_now = datetime.now(timezone.utc)
dt_end = datetime(dt_now.year, dt_now.month, dt_now.day, tzinfo=timezone.utc)
dt_start = dt_end - timedelta(days=1, hours=1) # Add 1 hour padding to account for potentially missed logs due to timing
for key in tqdm(BUCKET.objects.filter(Prefix='cflogs')):
remote_fname = key.key
local_fname = os.path.join(log_directory, remote_fname)
# Only download things from yesterday
dt_modified = key.last_modified.replace(tzinfo=timezone.utc)
if dt_start >= dt_modified or dt_end < dt_modified:
continue
# TODO: Do this in parallel
if not os.path.exists(local_fname):
dirname = os.path.dirname(local_fname)
if not os.path.exists(dirname):
os.makedirs(dirname)
CLIENT.download_file("pytorch", remote_fname, local_fname)
if __name__ == "__main__":
print("Downloading logs")
download_logs('cache', 1)
print("Parsing logs")
cache = parse_logs('cache/cflogs/')
print("Calculating results")
output_results(cache)