in azext_edge/edge/providers/orchestration/clone.py [0:0]
def _handle_federation(self, use_self_hosted_issuer: Optional[bool] = None):
if not self.user_assigned_mis:
return
cluster_resource = self.connected_cluster.resource
oidc_issuer = self.instances._ensure_oidc_issuer(
cluster_resource, use_self_hosted_issuer=use_self_hosted_issuer
)
for mid in self.user_assigned_mis:
parsed_uami_id = parse_resource_id(mid)
msi_client = get_msi_mgmt_client(subscription_id=parsed_uami_id["subscription"])
credentials = list(
msi_client.federated_identity_credentials.list(
resource_group_name=parsed_uami_id["resource_group"], resource_name=parsed_uami_id["name"]
)
)
# We need an efficient way to federate credentials that are in scope.
# First we need to build context of prior federation. We enumerate existing credentials
# for every uami that is associated with the instance to build a dict of issuer to service account pairs
# and separately a map of service accounts in play. We then iterate through desired/target issuer
# to service account pairs to see if they are already present, where if not AND the service account
# is in play we federate with best attempt.
cred_map = {}
cluster_svc_acct_map = {}
expected_creds = [(oidc_issuer, SERVICE_ACCOUNT_SECRETSYNC), (oidc_issuer, SERVICE_ACCOUNT_DATAFLOW)]
for cred in credentials:
svc_acct = cred["properties"]["subject"].split(":")[-1]
cred_map[(cred["properties"]["issuer"], svc_acct)] = 1
cluster_svc_acct_map[svc_acct] = 1
for exp_cred in expected_creds:
if exp_cred not in cred_map and exp_cred[1] in cluster_svc_acct_map:
subject = f"system:serviceaccount:{self.namespace}:{exp_cred[1]}"
try:
# Federate with best attempt.
msi_client.federated_identity_credentials.create_or_update(
resource_group_name=parsed_uami_id["resource_group"],
resource_name=parsed_uami_id["name"],
federated_identity_credential_resource_name=get_fc_name(
cluster_name=self.cluster_name,
oidc_issuer=oidc_issuer,
subject=subject,
),
parameters={
"properties": {
"subject": subject,
"audiences": ["api://AzureADTokenExchange"],
"issuer": oidc_issuer,
}
},
)
except HttpResponseError as e:
logger.debug(e)