modules/azure-identity/azure-id-lifecycle/scripts/azad_sync_job_schema_modify.py (62 lines of code) (raw):

import argparse import os import requests import json def read_user_mapping_attributes_json(filename): f = open(os.path.join(os.getcwd(), filename), 'r') user_mapping_attribute = f.read() return json.loads(user_mapping_attribute) # https://learn.microsoft.com/en-us/graph/api/synchronization-synchronizationschema-get def fetch_job_schema(service_principal_id, job_id, headers): job_schema_url = "https://graph.microsoft.com/v1.0/servicePrincipals/{}/synchronization/jobs/{}/schema".format( service_principal_id, job_id) try: get_response = requests.get(job_schema_url, headers=headers) get_response.raise_for_status() except requests.exceptions.HTTPError as e: print(e.args[0]) job_schema_json = get_response.json() return job_schema_json # https://learn.microsoft.com/en-us/graph/api/synchronization-synchronizationschema-update def update_job_schema(service_principal_id, job_id, headers, payload): job_schema_url = "https://graph.microsoft.com/v1.0/servicePrincipals/{}/synchronization/jobs/{}/schema".format( service_principal_id, job_id) try: update_response = requests.put(job_schema_url, json=payload, headers=headers) update_response.raise_for_status() if update_response.status_code == 204: print('schema updated successfully') else: print('schema updated failed with code: ', update_response.status_code, ' \nerror body: ', update_response.content) except requests.exceptions.HTTPError as e: print(e.args[0]) def msgraph_api_access_token(): az_token_key = 'AZ_TOKEN' try: os.environ[az_token_key] except KeyError: raise Exception(f"env var ", az_token_key, " is not set. execute to set 'export AZ_TOKEN=$(az account get-access-token --resource-type ms-graph | jq -r .accessToken)' ") return os.environ.get(az_token_key) def azad_sync_job_schema_modify(service_principal_id, provision_job_id): r_headers = {'Authorization': 'Bearer ' + msgraph_api_access_token(), 'Accept': 'application/json'} # fetch existing job schema job_schema_json = fetch_job_schema(service_principal_id=service_principal_id, job_id=provision_job_id, headers=r_headers) # add new attributes to existing job schema sra = job_schema_json['synchronizationRules'] for sr in sra: if sr['name'] == 'USERGROUP_OUTBOUND_USERGROUP' and sr['sourceDirectoryName'] == 'Microsoft Entra ID': objMaps = sr['objectMappings'] for om in objMaps: if om['name'] == 'Provision Microsoft Entra ID Users' and om['sourceObjectName'] == 'User': # read federated_user attribute federated_user_attr = read_user_mapping_attributes_json( filename='scripts/federated_user_mapping_attribute.json') om['attributeMappings'].append(federated_user_attr) # read bypass notif attribute bypass_notification_attr = read_user_mapping_attributes_json( 'scripts/bypass_notification_mapping_attribute.json') om['attributeMappings'].append(bypass_notification_attr) update_job_schema(service_principal_id, job_id=provision_job_id, headers=r_headers, payload=job_schema_json) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-sp', '--service_principal_id', help='Service principal id', required=True) parser.add_argument('-pj', '--provision_job_id', help='SSO identity Provision job Id', required=True) args = parser.parse_args() azad_sync_job_schema_modify(args.service_principal_id, args.provision_job_id)