def generate_bundles()

in hgserver/hgmolib/hgmolib/generate_hg_s3_bundles.py [0:0]


def generate_bundles(repo, upload=True, copyfrom=None, zstd_max=False):
    """Generate bundle files for a repository at a path.

    ``zstd_max`` denotes whether to generate zstd bundles with maximum
    compression.
    """
    assert not os.path.isabs(repo)

    # Copy manifest files from the source repository listed. Don't return
    # anything because we don't need to list bundles since this repo isn't
    # canonical.
    if copyfrom:
        # We assume all paths are pinned from a common root.
        assert not os.path.isabs(copyfrom)
        source_repo = os.path.join("/repo/hg/mozilla", copyfrom)
        dest_repo = os.path.join("/repo/hg/mozilla", repo)
        source = os.path.join(source_repo, ".hg", "clonebundles.manifest")
        dest = os.path.join(dest_repo, ".hg", "clonebundles.manifest")
        backup = os.path.join(dest_repo, ".hg", "clonebundles.manifest.last")

        # Create a backup of the last manifest so it can be restored easily.
        if os.path.exists(dest):
            print("copying %s -> %s" % (dest, backup))
            shutil.copy2(dest, backup)

        print("copying %s -> %s" % (source, dest))

        # copy2 copies metadata.
        shutil.copy2(source, dest)

        # Replicate manifest to mirrors.
        # Bug 1714463: don't replicate `copyfrom` for try
        if repo != "try":
            subprocess.check_call([HG, "replicatesync"], cwd=dest_repo)

        return {}

    repo_full = os.path.join("/repo/hg/mozilla", repo)

    # Bundle files are named after the tip revision in the repository at
    # the time the bundle was created. This is the easiest way to name
    # bundle files.
    tip = subprocess.check_output(
        [HG, "-R", repo_full, "log", "-r", "tip", "-T", "{node}"]
    ).decode("latin-1")
    print("tip of %s is %s" % (repo, tip))

    debugformat_json = json.loads(
        subprocess.check_output([HG, "-R", repo_full, "debugformat", "-T", "json"])
    )
    if not any(
        format["name"] == "generaldelta" and format["repo"] is True
        for format in debugformat_json
    ):
        raise Exception("non-generaldelta repo not supported: %s" % repo_full)

    bundle_dir = os.path.join(BUNDLE_ROOT, repo)

    # Create directory to hold bundle files.
    os.makedirs(bundle_dir, 0o755, exist_ok=True)

    # We keep the last bundle files around so we can reuse them if necessary.
    # Prune irrelevant files.
    for p in os.listdir(bundle_dir):
        if p.startswith(".") or p.startswith(tip):
            continue

        full = os.path.join(bundle_dir, p)
        print("removing old bundle file: %s" % full)
        os.unlink(full)

    # Bundle generation is pretty straightforward. We simply invoke
    # `hg bundle` for each type of bundle we're producing. We use ``-a``
    # to bundle all revisions currently in the repository.
    #
    # There is a race condition between discovering the tip revision and
    # bundling: it's possible for extra revisions beyond observed tip to
    # sneak into the bundles. This is acceptable. Bundles are best effort
    # to offload clone load from the server. They don't have to be exactly
    # identical nor as advertised.
    #
    # We write to temporary files then move them into place after generation.
    # This is because an aborted bundle process may result in a partial file,
    # which may confuse our don't-write-if-file-exists logic.

    bundles = []
    fs = []
    with futures.ThreadPoolExecutor(CONCURRENT_THREADS) as e:
        for bundle_format, args in CREATES:
            # Only generate 1 of zstd or zstd-max since they are redundant.
            if bundle_format == "zstd" and zstd_max:
                continue

            if bundle_format == "zstd-max" and not zstd_max:
                continue

            final_path, remote_path = bundle_paths(bundle_dir, repo, tip, bundle_format)
            temp_path = "%s.tmp" % final_path

            # Record that this bundle is relevant.
            bundles.append((bundle_format, final_path, remote_path))

            if os.path.exists(final_path):
                print("bundle already exists, skipping: %s" % final_path)
                continue

            fs.append(e.submit(generate_bundle, repo_full, temp_path, final_path, args))

    for f in fs:
        # Will re-raise exceptions.
        f.result()

    # Object path is keyed off the repository name so we can easily see what
    # is taking up space on the server.
    #
    # We upload directly to each EC2 region. This is worth explaining.
    #
    # S3 supports replication. However, replication occurs asynchronously
    # with the upload. This means there is a window between when upload
    # completes and when the bundle is available in the other region. We
    # don't want to advertise the bundle until it is distributed, as this
    # would result in a 404 and client failure. We could poll and wait for
    # replication to complete. However, there are similar issues with
    # using COPY...
    #
    # There is a COPY API on S3 that allows you to perform a remote copy
    # between regions. This seems like a perfect API, as it saves the
    # client from having to upload the same data to Amazon multiple times.
    # However, we've seen COPY operations take longer to complete than a
    # raw upload. See bug 1167732. Since bundles are being generated in a
    # datacenter that has plentiful bandwidth to S3 and because we
    # generally like operations to complete faster, we choose to simply
    # upload the bundle to multiple regions instead of employ COPY.
    if upload:
        fs = []
        with futures.ThreadPoolExecutor(CONCURRENT_THREADS) as e:
            for host, bucket, name in S3_HOSTS:
                for bundle_format, bundle_path, remote_path in bundles:
                    print("uploading to %s/%s/%s" % (host, bucket, remote_path))
                    fs.append(
                        e.submit(upload_to_s3, name, bucket, bundle_path, remote_path)
                    )

            for bucket, region in GCP_HOSTS:
                for bundle_format, bundle_path, remote_path in bundles:
                    # Only upload stream clone bundles for GCP since we never serve
                    # the other bundle formats there.
                    if bundle_format != "stream-v2":
                        continue

                    print("uploading to %s/%s/%s" % (GCS_ENDPOINT, bucket, remote_path))
                    fs.append(
                        e.submit(
                            upload_to_gcpstorage,
                            region,
                            bucket,
                            bundle_path,
                            remote_path,
                        )
                    )

            azure_credentials = get_azure_credentials()
            for account_url, region, container in AZURE_HOSTS:
                for bundle_format, bundle_path, remote_path in bundles:
                    # Only upload stream clone bundles for Azure since we never serve
                    # the other bundle formats there.
                    if bundle_format != "stream-v2":
                        continue

                    print(
                        "uploading to %s/%s/%s" % (account_url, container, remote_path)
                    )
                    fs.append(
                        e.submit(
                            upload_to_azure_storage,
                            azure_credentials,
                            account_url,
                            container,
                            bundle_path,
                            remote_path,
                        )
                    )

        # Future.result() will raise if a future raised. This will
        # abort script execution, which is fine since failure should
        # be rare given how reliable S3 is.
        for f in fs:
            f.result()

    # Now assemble a manifest listing each bundle.
    paths = {}
    for bundle_format, final_path, remote_path in bundles:
        paths[bundle_format] = (remote_path, os.path.getsize(final_path))

    bundle_types = set(t[0] for t in bundles)

    clonebundles_manifest = []
    for bundle_format, validate_spec in CLONEBUNDLES_ORDER:
        if bundle_format not in bundle_types:
            continue

        final_path, remote_path = bundle_paths(bundle_dir, repo, tip, bundle_format)
        bundle_spec = (
            subprocess.run(
                [HG, "debugbundle", "--spec", final_path],
                check=True,
                stdout=subprocess.PIPE,
            )
            .stdout.decode("ascii")
            .strip()
        )
        validate_spec(bundle_spec, final_path)
        clonebundles_manifest.append(
            "%s/%s BUNDLESPEC=%s REQUIRESNI=true cdn=true"
            % (CDN, remote_path, bundle_spec)
        )

        # Prefer S3 buckets over GCP buckets for the time being,
        # so add them first
        for host, bucket, name in S3_HOSTS:
            entry = "https://%s/%s/%s BUNDLESPEC=%s ec2region=%s" % (
                host,
                bucket,
                remote_path,
                bundle_spec,
                name,
            )
            clonebundles_manifest.append(entry)

        # Only add `stream-v2` bundles for GCP and Azure.
        if bundle_format == "stream-v2":
            for bucket, name in GCP_HOSTS:
                entry = "%s/%s/%s BUNDLESPEC=%s gceregion=%s" % (
                    GCS_ENDPOINT,
                    bucket,
                    remote_path,
                    bundle_spec,
                    name,
                )
                clonebundles_manifest.append(entry)

            for account_url, region, container_name in AZURE_HOSTS:
                entry = f"{account_url}/{container_name}/{remote_path} BUNDLESPEC={bundle_spec} azureregion={region}"
                clonebundles_manifest.append(entry)

    backup_path = os.path.join(repo_full, ".hg", "clonebundles.manifest.old")
    clonebundles_path = os.path.join(repo_full, ".hg", "clonebundles.manifest")

    if os.path.exists(clonebundles_path):
        print("Copying %s -> %s" % (clonebundles_path, backup_path))
        shutil.copy2(clonebundles_path, backup_path)

    with open(clonebundles_path, "w") as fh:
        fh.write("\n".join(clonebundles_manifest))

    # Normalize permissions on the manifest file.
    os.chmod(clonebundles_path, 0o664)

    # Replicate manifest to mirrors.
    subprocess.check_call([HG, "replicatesync"], cwd=repo_full)

    return paths