in hgserver/hgmolib/hgmolib/generate_hg_s3_bundles.py [0:0]
def generate_bundles(repo, upload=True, copyfrom=None, zstd_max=False):
"""Generate bundle files for a repository at a path.
``zstd_max`` denotes whether to generate zstd bundles with maximum
compression.
"""
assert not os.path.isabs(repo)
# Copy manifest files from the source repository listed. Don't return
# anything because we don't need to list bundles since this repo isn't
# canonical.
if copyfrom:
# We assume all paths are pinned from a common root.
assert not os.path.isabs(copyfrom)
source_repo = os.path.join("/repo/hg/mozilla", copyfrom)
dest_repo = os.path.join("/repo/hg/mozilla", repo)
source = os.path.join(source_repo, ".hg", "clonebundles.manifest")
dest = os.path.join(dest_repo, ".hg", "clonebundles.manifest")
backup = os.path.join(dest_repo, ".hg", "clonebundles.manifest.last")
# Create a backup of the last manifest so it can be restored easily.
if os.path.exists(dest):
print("copying %s -> %s" % (dest, backup))
shutil.copy2(dest, backup)
print("copying %s -> %s" % (source, dest))
# copy2 copies metadata.
shutil.copy2(source, dest)
# Replicate manifest to mirrors.
# Bug 1714463: don't replicate `copyfrom` for try
if repo != "try":
subprocess.check_call([HG, "replicatesync"], cwd=dest_repo)
return {}
repo_full = os.path.join("/repo/hg/mozilla", repo)
# Bundle files are named after the tip revision in the repository at
# the time the bundle was created. This is the easiest way to name
# bundle files.
tip = subprocess.check_output(
[HG, "-R", repo_full, "log", "-r", "tip", "-T", "{node}"]
).decode("latin-1")
print("tip of %s is %s" % (repo, tip))
debugformat_json = json.loads(
subprocess.check_output([HG, "-R", repo_full, "debugformat", "-T", "json"])
)
if not any(
format["name"] == "generaldelta" and format["repo"] is True
for format in debugformat_json
):
raise Exception("non-generaldelta repo not supported: %s" % repo_full)
bundle_dir = os.path.join(BUNDLE_ROOT, repo)
# Create directory to hold bundle files.
os.makedirs(bundle_dir, 0o755, exist_ok=True)
# We keep the last bundle files around so we can reuse them if necessary.
# Prune irrelevant files.
for p in os.listdir(bundle_dir):
if p.startswith(".") or p.startswith(tip):
continue
full = os.path.join(bundle_dir, p)
print("removing old bundle file: %s" % full)
os.unlink(full)
# Bundle generation is pretty straightforward. We simply invoke
# `hg bundle` for each type of bundle we're producing. We use ``-a``
# to bundle all revisions currently in the repository.
#
# There is a race condition between discovering the tip revision and
# bundling: it's possible for extra revisions beyond observed tip to
# sneak into the bundles. This is acceptable. Bundles are best effort
# to offload clone load from the server. They don't have to be exactly
# identical nor as advertised.
#
# We write to temporary files then move them into place after generation.
# This is because an aborted bundle process may result in a partial file,
# which may confuse our don't-write-if-file-exists logic.
bundles = []
fs = []
with futures.ThreadPoolExecutor(CONCURRENT_THREADS) as e:
for bundle_format, args in CREATES:
# Only generate 1 of zstd or zstd-max since they are redundant.
if bundle_format == "zstd" and zstd_max:
continue
if bundle_format == "zstd-max" and not zstd_max:
continue
final_path, remote_path = bundle_paths(bundle_dir, repo, tip, bundle_format)
temp_path = "%s.tmp" % final_path
# Record that this bundle is relevant.
bundles.append((bundle_format, final_path, remote_path))
if os.path.exists(final_path):
print("bundle already exists, skipping: %s" % final_path)
continue
fs.append(e.submit(generate_bundle, repo_full, temp_path, final_path, args))
for f in fs:
# Will re-raise exceptions.
f.result()
# Object path is keyed off the repository name so we can easily see what
# is taking up space on the server.
#
# We upload directly to each EC2 region. This is worth explaining.
#
# S3 supports replication. However, replication occurs asynchronously
# with the upload. This means there is a window between when upload
# completes and when the bundle is available in the other region. We
# don't want to advertise the bundle until it is distributed, as this
# would result in a 404 and client failure. We could poll and wait for
# replication to complete. However, there are similar issues with
# using COPY...
#
# There is a COPY API on S3 that allows you to perform a remote copy
# between regions. This seems like a perfect API, as it saves the
# client from having to upload the same data to Amazon multiple times.
# However, we've seen COPY operations take longer to complete than a
# raw upload. See bug 1167732. Since bundles are being generated in a
# datacenter that has plentiful bandwidth to S3 and because we
# generally like operations to complete faster, we choose to simply
# upload the bundle to multiple regions instead of employ COPY.
if upload:
fs = []
with futures.ThreadPoolExecutor(CONCURRENT_THREADS) as e:
for host, bucket, name in S3_HOSTS:
for bundle_format, bundle_path, remote_path in bundles:
print("uploading to %s/%s/%s" % (host, bucket, remote_path))
fs.append(
e.submit(upload_to_s3, name, bucket, bundle_path, remote_path)
)
for bucket, region in GCP_HOSTS:
for bundle_format, bundle_path, remote_path in bundles:
# Only upload stream clone bundles for GCP since we never serve
# the other bundle formats there.
if bundle_format != "stream-v2":
continue
print("uploading to %s/%s/%s" % (GCS_ENDPOINT, bucket, remote_path))
fs.append(
e.submit(
upload_to_gcpstorage,
region,
bucket,
bundle_path,
remote_path,
)
)
azure_credentials = get_azure_credentials()
for account_url, region, container in AZURE_HOSTS:
for bundle_format, bundle_path, remote_path in bundles:
# Only upload stream clone bundles for Azure since we never serve
# the other bundle formats there.
if bundle_format != "stream-v2":
continue
print(
"uploading to %s/%s/%s" % (account_url, container, remote_path)
)
fs.append(
e.submit(
upload_to_azure_storage,
azure_credentials,
account_url,
container,
bundle_path,
remote_path,
)
)
# Future.result() will raise if a future raised. This will
# abort script execution, which is fine since failure should
# be rare given how reliable S3 is.
for f in fs:
f.result()
# Now assemble a manifest listing each bundle.
paths = {}
for bundle_format, final_path, remote_path in bundles:
paths[bundle_format] = (remote_path, os.path.getsize(final_path))
bundle_types = set(t[0] for t in bundles)
clonebundles_manifest = []
for bundle_format, validate_spec in CLONEBUNDLES_ORDER:
if bundle_format not in bundle_types:
continue
final_path, remote_path = bundle_paths(bundle_dir, repo, tip, bundle_format)
bundle_spec = (
subprocess.run(
[HG, "debugbundle", "--spec", final_path],
check=True,
stdout=subprocess.PIPE,
)
.stdout.decode("ascii")
.strip()
)
validate_spec(bundle_spec, final_path)
clonebundles_manifest.append(
"%s/%s BUNDLESPEC=%s REQUIRESNI=true cdn=true"
% (CDN, remote_path, bundle_spec)
)
# Prefer S3 buckets over GCP buckets for the time being,
# so add them first
for host, bucket, name in S3_HOSTS:
entry = "https://%s/%s/%s BUNDLESPEC=%s ec2region=%s" % (
host,
bucket,
remote_path,
bundle_spec,
name,
)
clonebundles_manifest.append(entry)
# Only add `stream-v2` bundles for GCP and Azure.
if bundle_format == "stream-v2":
for bucket, name in GCP_HOSTS:
entry = "%s/%s/%s BUNDLESPEC=%s gceregion=%s" % (
GCS_ENDPOINT,
bucket,
remote_path,
bundle_spec,
name,
)
clonebundles_manifest.append(entry)
for account_url, region, container_name in AZURE_HOSTS:
entry = f"{account_url}/{container_name}/{remote_path} BUNDLESPEC={bundle_spec} azureregion={region}"
clonebundles_manifest.append(entry)
backup_path = os.path.join(repo_full, ".hg", "clonebundles.manifest.old")
clonebundles_path = os.path.join(repo_full, ".hg", "clonebundles.manifest")
if os.path.exists(clonebundles_path):
print("Copying %s -> %s" % (clonebundles_path, backup_path))
shutil.copy2(clonebundles_path, backup_path)
with open(clonebundles_path, "w") as fh:
fh.write("\n".join(clonebundles_manifest))
# Normalize permissions on the manifest file.
os.chmod(clonebundles_path, 0o664)
# Replicate manifest to mirrors.
subprocess.check_call([HG, "replicatesync"], cwd=repo_full)
return paths