def _handle_federation()

in azext_edge/edge/providers/orchestration/clone.py [0:0]


    def _handle_federation(self, use_self_hosted_issuer: Optional[bool] = None):
        if not self.user_assigned_mis:
            return

        cluster_resource = self.connected_cluster.resource
        oidc_issuer = self.instances._ensure_oidc_issuer(
            cluster_resource, use_self_hosted_issuer=use_self_hosted_issuer
        )

        for mid in self.user_assigned_mis:
            parsed_uami_id = parse_resource_id(mid)
            msi_client = get_msi_mgmt_client(subscription_id=parsed_uami_id["subscription"])
            credentials = list(
                msi_client.federated_identity_credentials.list(
                    resource_group_name=parsed_uami_id["resource_group"], resource_name=parsed_uami_id["name"]
                )
            )
            # We need an efficient way to federate credentials that are in scope.
            # First we need to build context of prior federation. We enumerate existing credentials
            # for every uami that is associated with the instance to build a dict of issuer to service account pairs
            # and separately a map of service accounts in play. We then iterate through desired/target issuer
            # to service account pairs to see if they are already present, where if not AND the service account
            # is in play we federate with best attempt.
            cred_map = {}
            cluster_svc_acct_map = {}
            expected_creds = [(oidc_issuer, SERVICE_ACCOUNT_SECRETSYNC), (oidc_issuer, SERVICE_ACCOUNT_DATAFLOW)]
            for cred in credentials:
                svc_acct = cred["properties"]["subject"].split(":")[-1]
                cred_map[(cred["properties"]["issuer"], svc_acct)] = 1
                cluster_svc_acct_map[svc_acct] = 1

            for exp_cred in expected_creds:
                if exp_cred not in cred_map and exp_cred[1] in cluster_svc_acct_map:
                    subject = f"system:serviceaccount:{self.namespace}:{exp_cred[1]}"
                    try:
                        # Federate with best attempt.
                        msi_client.federated_identity_credentials.create_or_update(
                            resource_group_name=parsed_uami_id["resource_group"],
                            resource_name=parsed_uami_id["name"],
                            federated_identity_credential_resource_name=get_fc_name(
                                cluster_name=self.cluster_name,
                                oidc_issuer=oidc_issuer,
                                subject=subject,
                            ),
                            parameters={
                                "properties": {
                                    "subject": subject,
                                    "audiences": ["api://AzureADTokenExchange"],
                                    "issuer": oidc_issuer,
                                }
                            },
                        )
                    except HttpResponseError as e:
                        logger.debug(e)