jobs/eam-integrations/scripts/workday_everfi_integration.py (387 lines of code) (raw):
from workday_everfi.api.Workday import WorkdayAPI
from workday_everfi.api.Everfi import EverfiAPI
from api.util import Util, APIAdaptorException
import argparse
import logging
import sys
def cal_user_location(wd_user, locs, loc_map_table):
loc = ""
location_country = wd_user.get("location_country", "")
if location_country == "Canada":
loc = loc_map_table.get(wd_user.get("location_province", ""), "")
if not loc:
loc = "Federal (Canada)"
elif location_country == "United States of America":
# if wd_user.get("location_state", "") == "New York":
loc = loc_map_table.get(wd_user.get("location_state", ""), "")
if not loc:
loc = "United States"
else:
loc = "Default"
id = locs.get(loc)["id"]
if not id:
id = locs.get("Default")["Id"]
logger.debug(
f"Location id={id} mapped for user {wd_user.get('primary_work_email','')} loc = {loc}"
)
return id
class Everfi:
def __init__(self) -> None:
self.everfi_api = EverfiAPI()
self.logger = logging.getLogger(self.__class__.__name__)
def get_everfi_users(self, locs, loc_map_table, hire_dates):
filter = {'filter[active]': 'true'}
fields = {'fields[users]': 'email,first_name,last_name,sso_id,employee_id,student_id,location_id,active,user_rule_set_roles,category_labels'}
return self.everfi_api.get_users(fields, filter, locs, loc_map_table, hire_dates)
def deactivate_users(self, del_list, everfi_users):
count = 0
for email in del_list:
id = everfi_users[email].get('id')
self.everfi_api.deactivate_user(id)
if '@' in email:
n = email.split("@")[0]
else:
n = email
self.logger.info(f"{n[:4]} .. {n[-1]} deleted")
count += 1
if count % 20 == 0:
self.logger.info(f"[{count} of {len(del_list)}] users deactivated.")
return count
def activate_user(self, id):
self.everfi_api.set_active(id,True)
def get_locations_mapping_table(self):
return self.everfi_api.get_locations_mapping_table()
def upd_everfi_users(
self,
hire_date_category_id,
hire_dates,
locs,
upd_list_keys,
wd_users,
everfi_users,
loc_map_table,
):
errors_list = []
count_upd = 0
loc_id_dict = {x.get('id'):x.get('attributes').get('name') for x in locs.values()}
for email in upd_list_keys:
wd_user = wd_users[email][1]
loc_id = cal_user_location(wd_user, locs, loc_map_table)
if int(loc_id) != everfi_users[email].get('attributes').get('location_id'):
if '@' in email:
n = email.split("@")[0]
else:
n = email
self.logger.info(f"User {n[:4]} .. {n[-1]} changed location from {loc_id_dict[str(everfi_users[email].get('attributes').get('location_id'))]} to {loc_id_dict[loc_id]}")
json_data = {
"data": {
"type": "registration_sets",
"id": everfi_users[email]["id"],
"attributes": {
"registrations": [
{
"rule_set": "user_rule_set",
"first_name": wd_user.get("preferred_first_name", ""),
"last_name": wd_user.get("preferred_last_name", ""),
"location_id": loc_id,
"employee_id": wd_user.get("employee_id", ""),
"sso_id": email,
},
{
"rule_set": "cc_learner",
"role": "supervisor"
if wd_user.get("is_manager", "")
else "non_supervisor",
},
],
},
},
}
try:
r = self.everfi_api.upd_user(everfi_users[email]["id"], json_data)
except Exception as e:
self.logger.exception(e)
errors_list.append(e)
cat_label_user_id = self.get_category_label_user_id(everfi_users[email]["id"])
if cat_label_user_id:
self.delete_category_label_user(cat_label_user_id)
#wd_users[email][1]["hire_date"] = '2024-07-10'
hire_date_id = self.get_hire_date_id(
wd_users[email][1]["hire_date"], hire_date_category_id, hire_dates
)
try:
r = self.everfi_api.assign_label_user(
r.data.get("data").get("id"), hire_date_id
)
except Exception as e:
self.logger.exception(e)
errors_list.append(e)
if count_upd % 20 == 0:
self.logger.info(f"[{count_upd} of {len(upd_list_keys)}] users updated.")
count_upd += 1
return count_upd
def get_category_label_user_id(self, id):
ret = self.everfi_api.get_category_label_user_id(id)
if len(ret.data.get('data',''))>0:
return ret.data.get('data','')[0].get('id','')
else:
return None
def delete_category_label_user(self, id):
ret = self.everfi_api.delete_category_label_user(id)
return ret
def bulk_clear_category_id(self, ids, category_id,category_label):
return self.everfi_api.bulk_clear_category_id(ids, category_id,category_label)
def get_hire_date_id(self, wd_hire_date, hire_date_category_id, hire_dates):
wd_hire_date = wd_hire_date.split("-")
wd_hire_date = wd_hire_date[1] + "-" + wd_hire_date[0]
hire_date_id = hire_dates.get(wd_hire_date)
if not hire_date_id:
# add new hire date
r = self.everfi_api.add_hire_date(
name=wd_hire_date, category_id=hire_date_category_id
)
id = r.data.get("data").get("id")
hire_dates[wd_hire_date] = id
return r.data.get("data").get("id")
return hire_date_id
def add_everfi_users(
self,
hire_date_category_id,
hire_dates,
locs,
add_list_keys,
wd_users,
loc_map_table,
):
errors = []
count_add = 0
for email in add_list_keys:
wd_user = wd_users[email][1]
loc_id = cal_user_location(wd_user, locs, loc_map_table)
json_data = {
"data": {
"type": "registration_sets",
"attributes": {
"registrations": [
{
"rule_set": "user_rule_set",
"first_name": wd_user.get("preferred_first_name", ""),
"last_name": wd_user.get("preferred_last_name", ""),
"email": wd_user.get("primary_work_email", ""),
"sso_id": email,
"employee_id": wd_user.get("employee_id", ""),
"location_id": loc_id,
},
{
"rule_set": "cc_learner",
"role": "supervisor"
if wd_user.get("is_manager", "")
else "non_supervisor",
},
],
},
},
}
try:
r = self.everfi_api.add_user(json_data)
except Exception as e:
self.logger.exception(e)
self.logger.info("Trying to activate user and update ")
if (e.args[0][0].get('id','')=='user_rule_set'):
# try to active user
# find user by email and then update the user with current data
filter = {'filter[email]': wd_user.get("primary_work_email", "")}
fields = {'fields[users]': 'id,email'}
#find user id
user = self.everfi_api.search_user(fields, filter)
id = user.get(email,'').get('id', '')
if id:
#self.activate_user(id)
json_data['data']['id'] = id
json_data['data']['attributes']['registrations'][0]['active'] = True
#active user and update fields
r = self.everfi_api.upd_user(id, json_data)
#remove hire date custom field
#hd = wd_users[email][1]["hire_date"].split('-')
cat_label_user_id = self.get_category_label_user_id(id)
if cat_label_user_id:
self.delete_category_label_user(cat_label_user_id)
#self.bulk_clear_category_id([id], hire_date_category_id, hd[1] + '-' + hd[0])
else:
errors.append(e)
continue
#wd_users[email][1]["hire_date"] = '2024-07-10'
hire_date_id = self.get_hire_date_id(
wd_users[email][1]["hire_date"], hire_date_category_id, hire_dates
)
try:
r = self.everfi_api.assign_label_user(
r.data.get("data").get("id"), hire_date_id
)
except Exception as e:
self.logger.exception(e)
errors.append(e)
count_add += 1
if '@' in email:
n = email.split("@")[0]
else:
n = email
self.logger.info(f"{n[:4]} .. {n[-1]} added")
if count_add % 20 == 0:
self.logger.info(f"[{count_add} of {len(add_list_keys)}] users added.")
return count_add
class Workday:
def build_comparison_string(self, wd_row, locs, loc_map_table):
loc_id = cal_user_location(wd_row, locs, loc_map_table)
hire_date = wd_row['hire_date'].split('-')
is_manager = "supervisor" if wd_row.get("is_manager", "") else "non_supervisor"
return (
wd_row["primary_work_email"]
+ "|"
+ wd_row["preferred_first_name"]
+ "|"
+ wd_row["preferred_last_name"]
+ "|"
+ wd_row["employee_id"]
+ "|"
+ loc_id
+ "|"
+ is_manager
+ "|"
+ hire_date[1] + "-" + hire_date[0]
+ "|"
+ wd_row["primary_work_email"]
)
def get_wd_users(self, locs, loc_map_table):
import pandas as pd
import io
# The API is not returning all fields in the json
# but the csv is, so we will use the csv version
# wd_users_csv = WorkdayAPI.get_datawarehouse_workers_csv()
workday_api = WorkdayAPI()
wd_users_csv = workday_api.get_datawarehouse_workers_csv()
df = pd.read_csv(io.StringIO(wd_users_csv), sep=",")
filtered = df[
(df["currently_active"] == True)
& (df["moco_or_mofo"] == "MoCo")
& (df["worker_type"] == "Employee")
| (df['primary_work_email'] == "jmoscon@mozilla.com")
]
comp = {
x[1]["primary_work_email"]: self.build_comparison_string(
x[1], locs, loc_map_table
)
for x in filtered.iterrows()
}
return comp, {x[1]["primary_work_email"]: x for x in filtered.iterrows()}
class WorkdayEverfiIntegration:
def __init__(self) -> None:
self.workday = Workday()
self.everfi = Everfi()
self.logger = logging.getLogger(self.__class__.__name__)
def compare_users(self, wd_comp, everfi_comp, wd_users, everfi_users):
import numpy as np
add_list = []
del_list = []
upd_list = []
wd_users_emails = list(wd_users.keys())
everfi_users_emails = list(everfi_users.keys())
add_list = np.setdiff1d(wd_users_emails, everfi_users_emails)
del_list = np.setdiff1d(everfi_users_emails, wd_users_emails)
intersect_list = np.intersect1d(wd_users_emails, everfi_users_emails)
for upd_email in intersect_list:
if everfi_comp[upd_email] != wd_comp[upd_email]:
upd_list.append(upd_email)
return add_list, del_list, upd_list
def run(self, limit):
# ========================================================
# Getting Everfi hire dates, locations and locations mapping table ...
# ========================================================
try:
self.logger.info("Getting everfi hire dates")
hire_date_category_id, hire_dates = self.everfi.everfi_api.get_hire_dates()
self.logger.info(f"Number of hire dates: {len(hire_dates)}")
self.logger.info("Getting everfi locations")
locs = self.everfi.everfi_api.get_locations()
self.logger.info(f"Number of locations: {len(locs)}")
self.logger.info("Getting everfi mapping table")
loc_map_table = self.everfi.everfi_api.get_locations_mapping_table()
self.logger.info(f"Number of mappins: {len(loc_map_table)}")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed while Getting Everfi hire dates,locations and locations mapping table ...")
sys.exit(1)
# ========================================================
# Getting Workday users...
# ========================================================
self.logger.info("Getting Workday users...")
try:
wd_comp, wd_users = self.workday.get_wd_users(locs, loc_map_table)
self.logger.info(f"Number of wd users: {len(wd_users)}")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed while Getting Workday users...")
sys.exit(1)
# ========================================================
# Getting Everfi users...
# ========================================================
self.logger.info("Getting Everfi users...")
try:
everfi_comp, everfi_users = self.everfi.get_everfi_users(locs, loc_map_table, hire_dates)
self.logger.info(f"Number of Everfi users: {len(everfi_users)}")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed while Getting Everfi users...")
sys.exit(1)
# ========================================================
# Comparing users...
# ========================================================
self.logger.info("Comparing users...")
try:
add_list, del_list, upd_list = integration.compare_users(
wd_comp, everfi_comp, wd_users, everfi_users
)
self.logger.info(f"Number of users to delete w/o limit={len(del_list)} with limit={len(del_list[:limit])}")
self.logger.info(f"Number of users to add w/o limit={len(add_list)} with limit={len(add_list[:limit])}")
self.logger.info(f"Number of users to update w/o limit={len(upd_list)} with limit={len(upd_list[:limit])}")
del_list = del_list[:limit]
add_list = add_list[:limit]
upd_list = upd_list[:limit]
except (Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed while Comparing users...")
sys.exit(1)
# ========================================================
# Deleting Everfi users ...
# ========================================================
self.logger.info("Deleting Everfi users ...")
try:
count_dels = self.everfi.deactivate_users(del_list, everfi_users)
self.logger.info(f"Number of users deleted {count_dels}")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Faile while Deleting Everfi users ...")
sys.exit(1)
# ========================================================
# Adding Everfi users ...
# ========================================================
self.logger.info("Adding Everfi users ...")
try:
count_add = self.everfi.add_everfi_users(
hire_date_category_id, hire_dates, locs, add_list, wd_users, loc_map_table
)
self.logger.info(f"Number of users added {count_add}")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed while Adding Everfi users ...")
sys.exit(1)
# ========================================================
# Updating Everfi users ...
# ========================================================
self.logger.info("Updating Everfi users ...")
try:
count_upd = self.everfi.upd_everfi_users(
hire_date_category_id,
hire_dates,
locs,
upd_list,
wd_users,
everfi_users,
loc_map_table,
)
self.logger.info(f"Number of users updated {count_upd}")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed while Updating Everfi users ...")
sys.exit(1)
self.logger.info("End of integration")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Sync up XMatters with Workday")
parser.add_argument(
"-l",
"--level",
action="store",
help="log level (debug, info, warning, error, or critical)",
type=str,
default="info",
)
parser.add_argument(
"-m",
"--max_limit",
action="store",
type=int,
help="limit the number of changes in Everfi",
default=40
)
args = None
args = parser.parse_args()
log_level = Util.set_up_logging(args.level)
logger = logging.getLogger(__name__)
logger.info("Starting...")
logger.info(f"max_limit={args.max_limit}")
integration = WorkdayEverfiIntegration()
integration.run(args.max_limit)