jobs/eam-integrations/scripts/api/XMatters/XMatters.py (497 lines of code) (raw):

import json import logging import re import requests # from integrations.api.connectors import Util from api.util import Util # from integrations.api.connectors.XMatters.secrets_xmatters import config as xm_config from api.XMatters.secrets_xmatters import config as xm_config logger = logging.getLogger(__name__) USE_BASIC_AUTH = True class LocalConfig(object): def __init__(self): new_api_suffix = "/api/xm/1" old_api_suffix = "/reapi/2015-04-01/" self.debug = 3 self.base_URL = xm_config["url"] + new_api_suffix self.base_URL_old_api = xm_config["url"] + old_api_suffix self.access_token = False def __getattr__(self, attr): return xm_config[attr] _config = LocalConfig() def get_access_token(): if not _config.access_token: _config.access_token = _get_access_token() return _config.access_token def _get_access_token(): endpoint_URL = "/oauth2/token" grant_type = "password" url = ( _config.base_URL + endpoint_URL + "?grant_type=" + grant_type + "&client_id=" + _config.xm_client_id + "&username=" + _config.xm_username + "&password=" + _config.xm_password ) if USE_BASIC_AUTH: return "" headers = {"Content-Type": "application/json"} response = requests.post(url, headers=headers, proxies=_config.proxies) if response.status_code == 200: rjson = response.json() access_token = rjson.get("access_token") else: print(response.status_code) logger.critical(response.json()) error = "Could not get an access token" logger.critical(error) raise Exception(error) return access_token # get all xmatters sites # OLD API # https://help.xmatters.com/OnDemand/xmodwelcome/communicationplanbuilder/appendixrestapi.htm?cshid=apiGETsites#GETsites # def get_all_sites(): """Gets all sites from xMatters Parameters: None Returns: dict: site name -> xmatters site id for all sites with status "ACTIVE" Example: { [...] 'US Remote (NV)' : '656173de-809d-46c3-82f3-dd718efa6af4', 'Israel Remote' : '7ed98c24-c73f-4fa5-b987-a005e80e2d63', 'US Remote (NM)' : '5224215b-337f-4c6b-bc6a-6211246e2086', 'US Remote (LA)' : '96c2de74-247d-4bb8-b0ec-592b86768f85', [...] } """ # https://mozilla-np.xmatters.com/xmatters.com/reapi/2015-04-01/sites logger.info("\n") logger.info("Gathering all XMatters sites") all_sites_url = _config.base_URL_old_api + "sites" xm_sites = {} xm_sites_inactive = {} while True: response = requests.get( all_sites_url, auth=(_config.xm_username, _config.xm_password), proxies=_config.proxies, ) if response.status_code == 200: rjson = response.json() logger.debug(rjson) else: error = "Could not get sites" logger.critical(error) raise Exception(error) for site in rjson["sites"]: logger.debug(site["name"] + " -- " + site["identifier"]) logger.debug(site) if site["status"] == "ACTIVE": xm_sites[site["name"]] = site["identifier"] else: xm_sites_inactive[site["name"]] = site["identifier"] if rjson["nextRecordsURL"] == "": logger.debug("No nextRecordsURL found. done with fetching") break else: logger.debug("NEXT RECORDS URL FOUND: %s" % rjson["nextRecordsURL"]) all_sites_url = _config.url + rjson["nextRecordsURL"] return xm_sites, xm_sites_inactive # get all people from xmatters # NEW API # https://help.xmatters.com/xmAPI/?python#get-people # def get_all_people(): """Gets all users/people from xMatters Parameters: None Returns: dict: name -> { attributes } Example: { [...] 'test@mozilla.com' : { u'recipientType': u'PERSON', u'status': u'ACTIVE', u'firstName': u'Chris', u'lastName': u'Test', u'links': {u'self': u'/api/xm/1/people/fc97c634-6e9b-4788-bdca-xxxxxxxxxxxx'}, u'externallyOwned': False, u'site': { u'id': u'2e6f8d1c-ced7-460e-bf5a-902109021090', u'links': { u'self': u'/api/xm/1/sites/2e6f8d1c-ced7-460e-bf5a-902109021090' }, u'name': u'Beverly Hills Office' }, u'properties': { u'Functional Group': u'IT', u'Executive': False, u'Manager Email': u'bbixby@mozilla.com', u'Home Country': u'United States of America', u'Cost Center': u'1440 - Enterprise Applications and Services (EApps)', u'Manager': u'Bill Bixby', u'Home Zipcode': u'90210', u'MERT/EVAC/Warden': False, u'Home City': u'Beverly Hills' }, u'language': u'en', u'webLogin': u'test@mozilla.com', u'timezone': u'US/Pacific', u'targetName': u'test@mozilla.com', u'id': u'fc97c634-6e9b-4788-bdca-xxxxxxxxxxxx' }, [...] } """ # https://mozilla-np.xmatters.com # https://mozilla-np.xmatters.com/api/xm/ logger.info("\n") logger.info("Gathering all XMatters people") url = _config.base_URL + "/people" headers = {"Authorization": "Bearer " + get_access_token()} if USE_BASIC_AUTH: headers = {} xm_people = {} while True: if USE_BASIC_AUTH: response = requests.get( url, auth=(_config.xm_username, _config.xm_password), proxies=_config.proxies, ) else: response = requests.get(url, headers=headers, proxies=_config.proxies) if response.status_code == 200: rjson = response.json() logger.debug( "Retrieved " + str(rjson["count"]) + " of " + str(rjson["total"]) + " people." ) else: logger.critical(response) raise Exception(response.content) for person in rjson["data"]: logger.debug( "%s %s (%s)" % (person["firstName"], person["lastName"], person["targetName"]) ) if person["lastName"] in ["[NO LAST NAME]", "NO LAST NAME"]: person["lastName"] = "" if person["firstName"] in ["[NO FIRST NAME]", "NO FIRST NAME"]: person["firstName"] = "" xm_people[person["targetName"]] = person # Only to backfill data # devs = get_devices_by_person(person['id']) # if not devs: # add_work_email_device(person) if "next" in rjson["links"]: url = _config.url + rjson["links"]["next"] else: break return xm_people def get_devices_by_person(person_id): """Gets a person's device(s) Parameters: ID (targetName or id) Returns: { "count":3, "total":3, "data":[ { "id":"a4d69579-f436-4e85-9d93-703714d85d72", "name":"Home Phone", "recipientType":"DEVICE", "phoneNumber":"+13235553643", "targetName":"akaur", "deviceType":"VOICE", [...] """ logger.info("\n") logger.info("Gathering all devices for %s" % person_id) url = _config.base_URL + "/people/" + person_id + "/devices" headers = {"Authorization": "Bearer " + get_access_token()} if USE_BASIC_AUTH: headers = {} xm_devices = [] while True: if USE_BASIC_AUTH: response = requests.get( url, auth=(_config.xm_username, _config.xm_password), proxies=_config.proxies, ) else: response = requests.get(url, headers=headers, proxies=_config.proxies) if response.status_code == 200: rjson = response.json() logger.debug( "Retrieved " + str(rjson["count"]) + " of " + str(rjson["total"]) + " devices." ) else: logger.critical(response) raise Exception(response.content) for device in rjson["data"]: logger.debug("Device - %s %s" % (device["name"], device["targetName"])) xm_devices.append(device) if "next" in rjson["links"]: url = _config.url + rjson["links"]["next"] else: break return xm_devices def add_work_email_device(xm_user): logger.info("Adding device %s to XMatters" % (xm_user["targetName"])) url = _config.base_URL + "/devices" if USE_BASIC_AUTH: headers = {"Content-Type": "application/json"} else: headers = { "Content-Type": "application/json", "Authorization": "Bearer " + get_access_token(), } if not re.search("@", xm_user["targetName"]): logger.error( "NOT adding device for %s because that ain't no email address!" % xm_user["targetName"] ) return False device_data = { "recipientType": "DEVICE", "deviceType": "EMAIL", "owner": xm_user["id"], "name": "Work Email", "emailAddress": xm_user["targetName"], "delay": 0, "priorityThreshold": "MEDIUM", "testStatus": "UNTESTED", } if USE_BASIC_AUTH: response = requests.post( url, headers=headers, auth=(_config.xm_username, _config.xm_password), data=json.dumps(device_data), proxies=_config.proxies, ) else: response = requests.post( url, headers=headers, data=json.dumps(device_data), proxies=_config.proxies ) if response.status_code == 201: return True else: logger.critical( "ERROR: something went wrong adding device for user %s" % (xm_user["targetName"]) ) logger.critical(response) logger.critical(response.content) raise Exception(response.content) # add_task site to xmatters # OLD API # https://help.xmatters.com/OnDemand/xmodwelcome/communicationplanbuilder/appendixrestapi.htm?cshid=apiGETsites#GETsites # def add_site(site): logger.info("Adding site %s to XMatters" % site) # TODO: move this to a config file or something? # fixup "bad" data if site["country"] == "United States of America": site["country"] = "United States" elif site["country"] == "Vietnam": site["country"] = "Viet Nam" elif site["country"] == "Czechia": site["country"] = "Czech Republic" elif site["country"] == "Tanzania": site["country"] = "Tanzania, United Republic of" elif site["country"] == "Korea, Republic of": site["country"] = "South Korea" if site["postal_code"] == "CZECH REPUBLIC": site["postal_code"] = "" if len(site["postal_code"]) > 10: site["postal_code"] = site["postal_code"][:10] if "timezone" not in site or "latitude" not in site or "longitude" not in site: (coords, tz) = Util.postal_to_coords_and_timezone( {"country": site["country"], "postal_code": site["postal_code"]} ) lat = coords[0] lng = coords[1] if tz is None: tz = "America/Los_Angeles" # arbitrary if lat is None: lat = 0 if lng is None: lng = 0 site_data = { "name": site["name"], "timezone": str(tz), # skip address for now as it's mostly bad data (remote sites) # 'address1': site['address'], "country": site["country"], "city": site["city"], # 'state': site['state'], "postalCode": site["postal_code"], "latitude": lat, "longitude": lng, "language": "English", "status": "ACTIVE", } sites_url = _config.base_URL_old_api + "sites" headers = {"Content-Type": "application/json"} response = requests.post( sites_url, auth=(_config.xm_username, _config.xm_password), headers=headers, data=json.dumps(site_data), proxies=_config.proxies, ) if response.status_code == 200 or response.status_code == 201: rjson = response.json() logger.debug(rjson) return site_data # for the unittests else: logger.critical("Could not create site") logger.critical(response.content) raise Exception(response.content) def set_site_inactive(xm_site_id): return set_site_status(xm_site_id, "INACTIVE") def set_site_active(xm_site_id): return set_site_status(xm_site_id, "ACTIVE") # OLD API # https://help.xmatters.com/OnDemand/xmodwelcome/communicationplanbuilder/appendixrestapi.htm?cshid=apiGETsites#GETsites # def set_site_status(xm_site_id, status): logger.info("Setting site %s to %s" % (xm_site_id, status)) site_data = { "status": status, } sites_url = _config.base_URL_old_api + "sites/" + xm_site_id headers = {"Content-Type": "application/json"} response = requests.post( sites_url, auth=(_config.xm_username, _config.xm_password), headers=headers, data=json.dumps(site_data), proxies=_config.proxies, ) if response.status_code == 200: rjson = response.json() logger.debug(rjson) return True else: logger.critical("Could not deactivate site") logger.critical(response.content) raise Exception(response.content) def add_new_sites(wd_sites, xm_sites, xm_sites_inactive, limit): logger.info("Adding new sites to XMatters") xm_sites_in_wd = {} num_changes = 0 for wd_site in wd_sites: if wd_site in xm_sites: logger.debug("WD site %s found in XMatters! No action." % wd_site) xm_sites_in_wd[wd_site] = 1 elif wd_site in xm_sites_inactive: if num_changes < limit: logger.info("WD site %s INACTIVE in XMatters! Reactivating." % wd_site) set_site_active(xm_sites_inactive[wd_site]) num_changes += 1 else: if num_changes < limit: logger.info( "WD site %s NOT found in XMatters! Adding to XMatters." % wd_site ) add_site(wd_sites[wd_site]) num_changes += 1 logger.info(f"Number of added or activated sites:{num_changes}") return xm_sites_in_wd def delete_sites(xm_sites, xm_sites_in_wd, limit): logger.info("\n") logger.info("Deleting empty sites from XMatters") num_changes = 0 for site in xm_sites: if site not in xm_sites_in_wd and site not in ["Default Site", "Mountain View Office"]: if num_changes < limit: logger.info( "Site %s not in WorkDay. INACTIVATING %s from XMatters" % (site, xm_sites[site]) ) set_site_inactive(xm_sites[site]) num_changes +=1 logger.info(f"Number of sites that were inactivated:{num_changes}") return True def sanitize_string_properties(input_str: str) -> str: """Remove known unwanted string patterns from string properties.""" known_patterns = [ "[C]", # Used to identify contractors in Workday "[c]", ] for pattern in known_patterns: input_str = input_str.replace(pattern, "") # remove possible whitespace from previous step input_str = input_str.strip() # keep only chars expected in international names return re.sub(r"[^A-Za-z0-9_À-ÿ ,.'-]", "", input_str, flags=re.U) # NEW API # https://help.xmatters.com/xmAPI/?python#modify-a-person # def update_user(wd_user, xm_user, xm_sites): logger.info( "Updating user %s (%s) in XMatters" % (xm_user["id"], xm_user["targetName"]) ) url = _config.base_URL + "/people" if USE_BASIC_AUTH: headers = {"Content-Type": "application/json"} else: headers = { "Content-Type": "application/json", "Authorization": "Bearer " + get_access_token(), } site_key = ( wd_user.get("User_Home_Country", "") + ":" + wd_user.get("User_Home_Postal_Code", "") ) person_data = { "id": xm_user["id"], "firstName": sanitize_string_properties( wd_user.get("User_Preferred_First_Name", "[NO FIRST NAME]") ), "lastName": sanitize_string_properties( wd_user.get("User_Preferred_Last_Name", "[NO LAST NAME]") ), "site": xm_sites[site_key], "properties": { "Home City": wd_user.get("User_Home_City", ""), "Home Country": wd_user.get("User_Home_Country", ""), "Home Zipcode": wd_user.get("User_Home_Postal_Code", "")[:20] }, } logger.debug("will upload this:") logger.debug(json.dumps(person_data)) if USE_BASIC_AUTH: response = requests.post( url, headers=headers, auth=(_config.xm_username, _config.xm_password), data=json.dumps(person_data), proxies=_config.proxies, ) else: response = requests.post( url, headers=headers, data=json.dumps(person_data), proxies=_config.proxies ) if response.status_code == 200: return person_data # for the unittest else: logger.critical( "ERROR: something went wrong updating user %s (%s)" % (xm_user["id"], xm_user["targetName"]) ) logger.critical(response) raise Exception(response.content) # NEW API # https://help.xmatters.com/xmAPI/?python#create-a-person # def add_user(wd_user, xm_sites): logger.info("Adding user %s to XMatters" % (wd_user["User_Email_Address"])) url = _config.base_URL + "/people" if USE_BASIC_AUTH: headers = {"Content-Type": "application/json"} else: headers = { "Content-Type": "application/json", "Authorization": "Bearer " + get_access_token(), } site_key = ( wd_user.get("User_Home_Country", "") + ":" + wd_user.get("User_Home_Postal_Code", "") ) person_data = { "firstName": sanitize_string_properties( wd_user.get("User_Preferred_First_Name", "[NO FIRST NAME]") ), "lastName": sanitize_string_properties( wd_user.get("User_Preferred_Last_Name", "[NO LAST NAME]") ), "targetName": wd_user["User_Email_Address"], "site": xm_sites[site_key], "recipientType": "PERSON", "status": "ACTIVE", "roles": ["Standard User"], "supervisors": [_config.supervisor_id], "properties": { "Home City": wd_user.get("User_Home_City", ""), "Home Country": wd_user.get("User_Home_Country", ""), "Home Zipcode": wd_user.get("User_Home_Postal_Code", "")[:20] }, } logger.debug("will upload this:") logger.debug(json.dumps(person_data)) if USE_BASIC_AUTH: response = requests.post( url, headers=headers, auth=(_config.xm_username, _config.xm_password), data=json.dumps(person_data), proxies=_config.proxies, ) else: response = requests.post( url, headers=headers, data=json.dumps(person_data), proxies=_config.proxies ) if response.status_code == 201: rjson = response.json() else: logger.critical( "ERROR: something went wrong adding user %s" % (wd_user["User_Email_Address"]) ) logger.critical(response) logger.critical(response.content) raise Exception(response.content) person_data["id"] = rjson["id"] add_work_email_device(person_data) return person_data # for the unittest # NEW API # https://help.xmatters.com/xmAPI/?python#delete-a-person # def actual_person_delete(target): logger.info("Sending DELETE request for %s" % target) url = _config.base_URL + "/people/" + target headers = {"Authorization": "Bearer " + get_access_token()} if USE_BASIC_AUTH: response = requests.delete( url, auth=(_config.xm_username, _config.xm_password), proxies=_config.proxies, ) else: response = requests.delete(url, headers=headers, proxies=_config.proxies) if response.status_code == 200: logger.info("Deleted person " + response.json().get("targetName")) return True elif response.status_code == 204: logger.warning("The person could not be found.") return False else: logger.critical("Could not delete person!") logger.critical(response) logger.critical(response.content) raise Exception(response.content) def delete_users(xm_users, users_seen_in_wd, limit): logger.info("\n") logger.info("Deleting old users from XMatters") num_changes = 0 for user in xm_users: if not re.search("@", user): # let's just skip any usernames that don't look like emails continue if user not in users_seen_in_wd: if num_changes < limit: logger.info("User %s not seen in workday, will delete from xmatters" % user) actual_person_delete(user) num_changes +=1 else: logger.info("User %s not seen in workday" % user) logger.info(f"Number of deleted users:{num_changes}") return True