def branch_protection()

in asfyaml/feature/github/branch_protection.py [0:0]


def branch_protection(self: ASFGitHubFeature):
    # Branch protections
    if "protected_branches" not in self.yaml:
        return

    # Collect all branches and whether they have active branch protection rules
    try:
        refs = get_head_refs(self)
    except Exception as ex:
        print(f"Error: failed to retrieve current refs: {ex!s}")
        refs = []

    protected_branches = set()
    for ref in refs:
        name = ref["name"]
        branch_protection_rule = ref.get("branchProtectionRule")
        if branch_protection_rule is not None:
            protected_branches.add(name)

    branches = self.yaml.get("protected_branches", {})
    # If protected_branches is set to ~ (None), reset it to an empty map
    # We still need to remove existing branch protection rules from all existing branches later on
    if branches is None:
        branches = {}

    protection_changes = {}
    for branch, brsettings in branches.items():
        if branch in protected_branches:
            protected_branches.remove(branch)

        branch_changes = []
        try:
            ghbranch = self.ghrepo.get_branch(branch=branch)
        except pygithub.GithubException as e:
            if e.status == 404:  # No such branch, skip to next rule
                protection_changes[branch] = [f"Branch {branch} does not exist, protection could not be configured"]
                continue
            else:
                # propagate other errors, GitHub API might have an outage
                raise e

        # We explicitly disable force pushes when branch protections are enabled
        allow_force_push = False

        # Required signatures
        required_signatures = brsettings.get("required_signatures", NotSet)

        # Required linear history
        required_linear = brsettings.get("required_linear_history", NotSet)

        # Required conversation resolution
        # Requires all conversations to be resolved before merging is possible
        required_conversation_resolution = brsettings.get("required_conversation_resolution", NotSet)

        # Required pull requests reviews
        # As this is a nested object, we check for existence of the key to check if we should enable it
        if "required_pull_request_reviews" in brsettings:
            required_pull_request_reviews = brsettings.get("required_pull_request_reviews", {})

            required_approving_review_count = required_pull_request_reviews.get("required_approving_review_count", 0)
            require_code_owner_reviews = required_pull_request_reviews.get("require_code_owner_reviews")
            dismiss_stale_reviews = required_pull_request_reviews.get("dismiss_stale_reviews", NotSet)
            require_last_push_approval = required_pull_request_reviews.get("require_last_push_approval", NotSet)
        else:
            required_pull_request_reviews = NotSet
            required_approving_review_count = NotSet
            dismiss_stale_reviews = NotSet
            require_last_push_approval = NotSet
            require_code_owner_reviews = NotSet

        required_checks: Opt[list[tuple[str, int]]]

        # Required status checks
        if "required_status_checks" in brsettings:
            required_status_checks = brsettings.get("required_status_checks", {})

            # strict means "Require branches to be up to date before merging".
            require_strict = required_status_checks.get("strict", NotSet)

            contexts = required_status_checks.get("contexts", [])
            checks = required_status_checks.get("checks", [])
            checks_as_dict = {**{ctx: -1 for ctx in contexts}, **{c["context"]: int(c["app_id"]) for c in checks}}

            required_checks = list(checks_as_dict.items())

            # if no checks are defined, we remove the status checks completely
            if len(required_checks) == 0:
                required_status_checks = NotSet
        else:
            required_status_checks = NotSet
            require_strict = NotSet
            required_checks = NotSet

        # Log changes that will be applied
        try:
            live_branch_protection_settings = ghbranch.get_protection()
        except pygithub.GithubException:
            live_branch_protection_settings = None

        if (
            live_branch_protection_settings is None
            or allow_force_push != live_branch_protection_settings.allow_force_pushes
        ):
            branch_changes.append(f"Set allow force push to {allow_force_push}")

        if is_defined(required_signatures) and (
            live_branch_protection_settings is None
            or required_signatures != live_branch_protection_settings.required_signatures
        ):
            branch_changes.append(f"Set required signatures to {required_signatures}")

        if is_defined(required_linear) and (
            live_branch_protection_settings is None
            or required_linear != live_branch_protection_settings.required_linear_history
        ):
            branch_changes.append(f"Set required linear history to {required_linear}")

        if is_defined(required_conversation_resolution) and (
            live_branch_protection_settings is None
            or required_conversation_resolution != live_branch_protection_settings.required_conversation_resolution
        ):
            branch_changes.append(f"Set required conversation resolution to {required_conversation_resolution}")

        if is_defined(required_pull_request_reviews):
            if live_branch_protection_settings is None:
                live_reviews = None
            else:
                live_reviews = live_branch_protection_settings.required_pull_request_reviews

            if is_defined(required_approving_review_count) and (
                live_reviews is None or required_approving_review_count != live_reviews.required_approving_review_count
            ):
                branch_changes.append(f"Set required approving review count to {required_approving_review_count}")

            if is_defined(require_code_owner_reviews) and (
                live_reviews is None or require_code_owner_reviews != live_reviews.require_code_owner_reviews
            ):
                branch_changes.append(f"Set required code owner reviews to {require_code_owner_reviews}")

            if is_defined(dismiss_stale_reviews) and (
                live_reviews is None or dismiss_stale_reviews != live_reviews.dismiss_stale_reviews
            ):
                branch_changes.append(f"Set dismiss stale reviews to {dismiss_stale_reviews}")

            if is_defined(require_last_push_approval) and (
                live_reviews is None or require_last_push_approval != live_reviews.require_last_push_approval
            ):
                branch_changes.append(f"Set require last push approval to {require_last_push_approval}")

        if is_defined(required_status_checks):
            if live_branch_protection_settings is None:
                live_status_checks = None
            else:
                live_status_checks = live_branch_protection_settings.required_status_checks

            if is_defined(require_strict) and (
                live_status_checks is None or require_strict != live_status_checks.strict
            ):
                branch_changes.append(
                    f"Set require branches to be up to date before merging (strict) to {require_strict}"
                )

            # Always log the required checks that will be set for now. We will need to parse
            # the context field in a RequiredStatusChecks object.
            if is_defined(required_checks):
                branch_changes.append("Set required status contexts to the following:")
                for ctx, appid in required_checks:
                    branch_changes.append(f"  - {ctx} (app_id: {appid})")

        # Apply all the changes
        if not self.noop("protected_branches"):
            branch_protection_settings = ghbranch.edit_protection(
                allow_force_pushes=allow_force_push,
                required_linear_history=required_linear,
                required_conversation_resolution=required_conversation_resolution,
                required_approving_review_count=required_approving_review_count,
                dismiss_stale_reviews=dismiss_stale_reviews,
                require_code_owner_reviews=require_code_owner_reviews,
                require_last_push_approval=require_last_push_approval,
                strict=require_strict,
                checks=required_checks,  # type: ignore
            )

            if is_defined(required_signatures):
                if required_signatures and branch_protection_settings.required_signatures is False:
                    ghbranch.add_required_signatures()
                elif not required_signatures and branch_protection_settings.required_signatures is True:
                    ghbranch.remove_required_signatures()

            # if required pull requests are not enabled but present live, we need to explicitly remove them
            if (
                not is_defined(required_pull_request_reviews)
                and branch_protection_settings.required_pull_request_reviews is not None
            ):
                branch_changes.append("Remove required pull request reviews")
                ghbranch.remove_required_pull_request_reviews()

            # if required status checks are not enabled but present live, we need to explicitly remove them
            if not is_defined(required_status_checks) and branch_protection_settings.required_status_checks is not None:
                branch_changes.append("Remove required status checks")
                ghbranch.remove_required_status_checks()

        # Log all the changes we made to this branch
        if branch_changes:
            protection_changes[branch] = branch_changes

    # remove branch protection from all remaining protected branches
    for branch_name in protected_branches:
        branch = self.ghrepo.get_branch(branch_name)
        protection_changes[branch] = [f"Remove branch protection from branch '{branch_name}'"]

        if not self.noop("github::protected_branches"):
            branch.remove_protection()

    if protection_changes:
        summary = ""
        for branch, changes in protection_changes.items():
            summary += f"Updates to the {branch} branch:\n"
            for change in changes:
                summary += f"  - {change}\n"
        print(summary)