jobs/eam-integrations/scripts/workday_netsuite_integration.py (788 lines of code) (raw):
from argparse import ArgumentParser
import logging
import re
import sys
from datetime import datetime
import json
from enum import Enum
from workday_netsuite.api.workday import WorkDayRaaService, Worker, InternationalTransfer
from workday_netsuite.api.netsuite import NetSuiteRestlet
from api.util import Util, APIAdaptorException
from workday_netsuite.api.netsuite import NetSuiteRestletException
class Operations(Enum):
update_employee = 1
add_new_hire = 2
add_new_manager = 3
international_transfer = 4
rehired = 5
def fix_none(x):
return '' if (not x or x=='- None -') else x.strip()
class Workday:
def __init__(self,) -> None:
self.workday_service = WorkDayRaaService()
self.mapping = DataMapping()
def get_international_transfers(self, ns_workers,workers_dict):
today = datetime.today()
start_date = f"{today.year-2}-01-01"
end_date = datetime.today().strftime('%Y-%m-%d')
wd_data = self.workday_service.get_international_transfers(start_date,end_date)
worker_list = []
#check if the employee in NetSuite was already transfered
for worker in wd_data["Report_Entry"]:
intern_transfer = InternationalTransfer(**worker)
ns_worker = ns_workers.get(intern_transfer.Employee_ID,'')
wd_worker = workers_dict.get(intern_transfer.Employee_ID,'')
if ns_worker:
if self.mapping.map_company(wd_worker.get('Country','')) != ns_worker.get('Company'):
worker_list.append(InternationalTransfer(**worker))
return worker_list
def get_listing_of_workers(self):
wd_data = self.workday_service.get_listing_of_workers()
worker_dict = {}
worker_list = []
wd_comp = {}
for worker in wd_data["Report_Entry"]:
worker['Cost_Center_ID'] = worker.pop('Cost_Center_-_ID')
worker_list.append(Worker(**worker))
worker_dict[worker['Employee_ID']] = worker
wd_comp[worker['Employee_ID']] = self.build_comparison_string(worker)
return worker_list,worker_dict, wd_comp
def build_comparison_string(self, wd_worker):
ns_country = self.mapping.map_country(wd_worker.get('Country',''))
#Try Province if State is empty
state = wd_worker.get('State','') if wd_worker.get('State','').strip()!="" else wd_worker.get('Province','')
company = self.mapping.map_company(ns_country)
if not wd_worker.get('Preferred_Full_Name'):
First_Name = wd_worker.get('First_Name')
Last_Name = wd_worker.get('Last_Name')
else:
# check if there are Chinese chars
if re.findall(r'[\u4e00-\u9fff]+', wd_worker.get('Preferred_Full_Name')):
First_Name = wd_worker.get('First_Name')
Last_Name = wd_worker.get('Last_Name')
else:
First_Name = (' ').join(wd_worker.get('Preferred_Full_Name').split(' ')[0:-1])
Last_Name = wd_worker.get('Preferred_Full_Name').split(' ')[-1]
return (
fix_none(wd_worker.get('Employee_ID',''))
+ "|"
+ fix_none(wd_worker.get('Employee_Type',''))
+ "|"
+ fix_none(wd_worker.get('Most_Recent_Hire_Date',''))
+ "|"
+ fix_none(company)
+ "|"
+ fix_none(wd_worker.get('Manager_ID',''))
+ "|"
+ fix_none(wd_worker.get('Cost_Center_ID',''))
+ "|"
+ fix_none(wd_worker.get('primaryWorkEmail',''))
+ "|"
+ fix_none(First_Name)
+ "|"
+ fix_none(Last_Name)
+ "|"
+ fix_none(self.mapping.map_country(wd_worker.get('Country','')))
+ "|"
+ fix_none(wd_worker.get('Employee_Status',''))
+ "|"
+ fix_none(wd_worker.get('Primary_Address',''))
+ "|"
+ fix_none(state)
+ "|"
+ fix_none(wd_worker.get('City',''))
+ "|"
+ fix_none(wd_worker.get('Postal',''))
+ "|"
+ fix_none(self.mapping.map_payment_method(ns_country))
+ "|"
+ fix_none(self.mapping.map_currency(ns_country))
+ "|"
+ fix_none(str(self.mapping.product_class_map_dict.get(wd_worker.get('Product',''))))
)
class DataMapping():
def __init__(self):
ret = NetSuiteRestlet().get_product_class_mapping()
self.product_class_map_dict = {x.get('externalId'):x.get('internalId')
for x in ret.data}
def map_country_codes(self, ns_country):
ditct = {
'Albania': 'AL',
'American Samoa': 'AS',
'Argentina': 'AR',
'Armenia': 'AM',
'Azerbaijan': 'AZ',
'Bahrain': 'BH',
'Barbados': 'BB',
'Belarus': 'BY',
'Bolivia': 'BO',
'Botswana': 'BW',
'Brazil': 'BR',
'Cameroon': 'CM',
'Chile': 'CL',
'Colombia': 'CO',
'Costa Rica': 'CR',
'Dominican Republic': 'DO',
'Ecuador': 'EC',
'Egypt': 'EG',
'El Salvador': 'SV',
'Georgia': 'GE',
'Guam': 'GU',
'Guatemala': 'GT',
'Honduras': 'HN',
'Hong Kong': 'HK',
'India': 'IN',
'Indonesia': 'ID',
'Israel': 'IL',
'Jamaica': 'JM',
'Japan': 'JP',
'Jordan': 'JO',
'Kazakhstan': 'KZ',
'Kenya': 'KE',
'Korea, Republic of': 'KR',
'Kosovo': 'XK',
'Kuwait': 'KW',
'Kyrgyzstan': 'KG',
'Malaysia': 'MY',
'Mauritius': 'MU',
'Mexico': 'MX',
'Moldova, Republic of': 'MD',
'Mozambique': 'MZ',
'Namibia': 'NA',
'Nicaragua': 'NI',
'Nigeria': 'NG',
'Pakistan': 'PK',
'Peru': 'PE',
'Philippines': 'PH',
'Puerto Rico': 'PR',
'Qatar': 'QA',
'ROW': 'US',
'Asia Pacific (APAC)': 'US',
'Australia': 'AU',
'China': 'CN',
'New Zealand': 'NZ',
'Taiwan': 'TW',
'EMEA': 'US',
'Austria': 'AT',
'Belgium': 'BE',
'Bosnia and Herzegovina': 'BA',
'Bulgaria': 'BG',
'Croatia': 'HR',
'Cyprus': 'CY',
'Czech Republic': 'CZ',
'Denmark': 'DK',
'Estonia': 'EE',
'Finland': 'FI',
'France': 'FR',
'Germany': 'DE',
'Berlin Office': 'DE',
'Greece': 'GR',
'Hungary': 'HU',
'Iceland': 'IS',
'Ireland': 'IE',
'Italy': 'IT',
'Latvia': 'LV',
'Lithuania': 'LT',
'Luxembourg': 'LU',
'Macedonia, Republic of North': 'MK',
'Malta': 'MT',
'Netherlands': 'NL',
'Norway': 'NO',
'Poland': 'PL',
'Portugal': 'PT',
'Romania': 'RO',
'Russian Federation': 'RU',
'Serbia': 'RS',
'Slovakia': 'SK',
'Slovenia': 'SI',
'Spain': 'ES',
'Sweden': 'SE',
'Switzerland': 'CH',
'United Kingdom': 'GB',
'Non-US Americas': 'US',
'Canada': 'CA',
'Toronto Office': 'CA',
'Saint Lucia': 'LC',
'Saudi Arabia': 'SA',
'Senegal': 'SN',
'Singapore': 'SG',
'South Africa': 'ZA',
'Thailand': 'TH',
'Turkey': 'TR',
'Uganda': 'UG',
'Ukraine': 'UA',
'United Arab Emirates': 'AE',
'Uruguay': 'UY',
'USA': 'US',
'United States': 'US',
'San Francisco Office': 'US',
'Uzbekistan': 'UZ',
'Vietnam': 'VN',
'Zimbabwe': 'ZW'
}
return ditct.get(ns_country,'')
def map_country(self, country):
if country =="United States of America":
return "United States"
elif country == "Czechia":
return "Czech Republic"
else:
return country
def map_company_id(self, company):
dict = {
'Moz 2008 Corporation (Australia)'.upper():'27',
'Mozilla Corporation'.upper():'1',
'MZ Denmark ApS, Belgium Branch'.upper():'22',
'MZ Canada Internet ULC (Canada)'.upper():'30',
'Mozilla Corporation'.upper():'1',
'MZ Denmark ApS, filall Finland'.upper():'24',
'MZ Denmark (France)'.upper():'32',
'MZ Denmark GmbH (Germany)'.upper():'33',
'Mozilla Corporation'.upper():'1',
'Mozilla Corporation'.upper():'1',
'MZ Netherlands B.V.'.upper():'41',
'Moz 2008 Corporation (New Zealand)'.upper():'26',
'MZ Denmark ApS'.upper():'35',
'MZ Denmark ApS, Sucursal en Espana (Spain)'.upper():'36',
'MZ Denmark ApS Danmark) filial (Sweden)'.upper():'37',
'Moz 2008 Corporation (Taiwan)'.upper():'28',
'MZ Denmark (UK)'.upper():'38',
'Mozilla Corporation'.upper():'1',
'MZ Denmark ApS'.upper():'21',
}
return dict.get(company.upper(),'')
def map_company(self, country, index=1):
dict = {
'Australia':('Moz 2008 Corporation (Australia)','27'),
'Austria':('Mozilla Corporation','1'),
'Belgium':('MZ Denmark ApS, Belgium Branch','22'),
'Canada':('MZ Canada Internet ULC (Canada)','30'),
'Czech Republic':('Mozilla Corporation','1'),
'Finland':('MZ Denmark ApS, filall Finland','24'),
'France':('MZ Denmark (France)','32'),
'Germany':('MZ Denmark GmbH (Germany)','33'),
'Greece':('Mozilla Corporation','1'),
'Italy':('Mozilla Corporation','1'),
'Netherlands':('MZ Netherlands B.V.','41'),
'New Zealand':('Moz 2008 Corporation (New Zealand)','26'),
'Poland':('MZ Denmark ApS','21'),
'Spain':('Denmark ApS, Sucursal en Espana (Spain)','36'),
'Sweden':('MZ Denmark ApS Danmark) filial (Sweden)','37'),
'Taiwan':('Moz 2008 Corporation (Taiwan)','28'),
'United Kingdom':('MZ Denmark (UK)','38'),
'United States':('Mozilla Corporation','1'),
'United States of America':('Mozilla Corporation','1'),
'Denmark':('MZ Denmark ApS','21'),
}
ret = dict.get(country,'')
if not ret:
return ''
return ret[index]
def map_payment_method(self, country):
mcountry = self.map_country(country)
if mcountry in ["Belgium","Finland", "France", "Germany",
"Netherlands","Poland", "Spain", "Sweden",
"Denmark", "Canada"]:
return "SEPA"
elif mcountry in ["Austria", "Czech Republic","Greece", "Italy",
"United States"]:
return "ACH"
elif mcountry in ["United Kingdom"]:
return "BACS"
elif mcountry in ["Australia","New Zealand"]:
return "Wire"
else:
return None
def map_currency(self, country):
if country in ["Belgium", "Finland",
"France", "Germany",
"Netherlands", "Spain"]:
return "EUR"
elif country in ["Australia"]:
return "AUD"
elif country in ["Canada"]:
return "CAD"
elif country in ["Poland","Denmark"]:
return "DKK"
elif country in ["United Kingdom"]:
return "GBP"
elif country in ["New Zealand"]:
return "NZD"
elif country in ["Sweden"]:
return "SEK"
elif country in ["Austria", "Czech Republic","Greece",
"Italy","United States"]:
return "USD"
else:
return None
def map_class(self, product):
if product == "Advertising": return 8
elif product == "Emails": return 113
elif product == "Emails Dedicated": return 114
elif product == "Emails Standard": return 14
elif product == "Fakespot": return 130
elif product == "In-App/Web": return 15
elif product == "MDN Advertising": return 126
elif product == "Native Desktop": return 110
elif product == "Native Mobile": return 129
elif product == "Tiles Desktop": return 11
elif product == "Tiles Direct Sell": return 108
elif product == "Tiles Mobile": return 112
elif product == "Business Support": return 27
elif product == "All-Hands 2023": return 104
elif product == "All-Hands 2024": return 133
elif product == "China": return 24
elif product == "Content": return 134
elif product == "Firefox Other": return 26
elif product == "Hubs Other": return 25
elif product == "Innovation BI": return 118
elif product == "Innovation General": return 119
elif product == "Innovation MEICO": return 116
elif product == "Innovation Mradi": return 111
elif product == "Innovation Studio": return 120
elif product == "MozSocial": return 132
elif product == "Pocket Other": return 121
elif product == "Firefox ESR": return 4
elif product == "Keyword Search Desktop": return 2
elif product == "Keyword Search Mobile": return 3
elif product == "Suggest Desktop": return 9
elif product == "Suggest Mobile": return 10
elif product == "Vertical Desktop": return 6
elif product == "Vertical Mobile": return 7
elif product == "FPN": return 20
elif product == "Hubs Subscription": return 107
elif product == "MDN Subscription": return 22
elif product == "Monitor": return 128
elif product == "Pocket Premium": return 17
elif product == "PXI Other": return 18
elif product == "Relay": return 21
elif product == "Relay Bundle Email": return 106
elif product == "Relay Bundle Phone": return 122
elif product == "VPN": return 19
elif product == "VPN Relay Bundle": return 105
elif product == "VPN Relay Bundle Email": return 124
elif product == "VPN Relay Bundle Phone": return 125
elif product == "VPN Relay Bundle VPN": return 123
else:
return None
class NetSuite():
def __init__(self) -> None:
self.ns_restlet = NetSuiteRestlet()
self.mapping = DataMapping()
self.logger = logging.getLogger(self.__class__.__name__)
self.error_lst = []
def get_product_class_map(self, product):
return self.mapping.product_class_map_dict.get(product,'')
def format_date(self, date_str):
try:
data_lst = date_str.split('/')
return f"{data_lst[2]}-{data_lst[0]}-{data_lst[1]}"
except Exception:
return ""
def build_comparison_string(self, ns_worker):
if ns_worker.get("External ID")=='200221':
print('s')
if '__RANDOM_ID__' in fix_none(ns_worker.get('External ID','')):
# external_id = fix_none(ns_worker.get('Employee ID')).split('-')[0].strip()
external_id = self.extract_employee_id(ns_worker.get('Employee ID'))
# external_id = re.findall(r'^\d+',ns_worker.get('Employee ID'))[0]
else:
external_id= fix_none(ns_worker.get('External ID',''))
external_id = self.extract_employee_id(ns_worker.get('Employee ID'))
return (
external_id
+ "|"
+ fix_none(ns_worker.get('Employee Type',''))
+ "|"
+ self.format_date(fix_none(ns_worker.get('Original Hire Date','')))
+ "|"
+ fix_none(ns_worker.get('Company',''))
+ "|"
+ fix_none(self.extract_employee_id((ns_worker.get('Manager ID',''))))
+ "|"
+ fix_none(ns_worker.get('Cost Center ID',''))
+ "|"
+ fix_none(ns_worker.get('Email - Primary Work',''))
+ "|"
+ fix_none(ns_worker.get('First Name',''))
+ "|"
+ fix_none(ns_worker.get('Last Name',''))
+ "|"
+ fix_none(ns_worker.get('Country',''))
# + "|"
# + self.format_date(fix_none(ns_worker.get('Termination Date','')))
+ "|"
+ fix_none('1' if ns_worker.get("Employee Status - Active?",'')=="Actively Employed" else '2')
+ "|"
+ fix_none(ns_worker.get('Address1',''))
+ "|"
+ fix_none('' if ns_worker.get('State','')=='- None -' else ns_worker.get('State',''))
+ "|"
+ fix_none(ns_worker.get('City',''))
+ "|"
+ fix_none(ns_worker.get('Zipcode',''))
+ "|"
+ ns_worker.get("DEFAULT CURRENCY FOR EXP. REPORT")
+ "|"
+ ns_worker.get("Payment Method")
+ "|"
+ ns_worker.get("Class")
)
def extract_employee_id(self,employee_id):
_employee_id = re.findall(r'^\d+',employee_id)
if len(_employee_id)>0:
return fix_none(_employee_id[0])
else:
return fix_none(employee_id).split(' ')[0].strip()
def get_employees(self):
def fixEmployeeID(ns_worker):
import re
return self.extract_employee_id(ns_worker.get('Employee ID'))
ret = self.ns_restlet.get_employees()
ret_active = {fixEmployeeID(x):x for x in ret.data if x.get('Employee Status - Active?')=='Actively Employed'}
return ret_active, {fixEmployeeID(x):self.build_comparison_string(x) for x in ret_active.values()}
def compare_users(self, wd_comp, ns_comp):
import numpy as np
add_list = []
upd_list = []
wd_users_emails = list(wd_comp.keys())
ns_users_emails = list(ns_comp.keys())
add_list = np.setdiff1d(wd_users_emails, ns_users_emails)
del_list = np.setdiff1d(ns_users_emails, wd_users_emails)
intersect_list = np.intersect1d(wd_users_emails, ns_users_emails)
for upd_email in intersect_list:
if wd_comp.get(upd_email,'') != ns_comp.get(upd_email,''):
upd_list.append(upd_email)
return add_list, upd_list, del_list
def post_error_report(self, operation):
output_data = None
for record in self.error_lst:
employees_data = record[0].get('employees', [])
error_description = record[1]
row_id = record[2]
for employee in employees_data:
output_data = [{
"row_id": row_id,
"error_description": error_description,
"operation": operation,
"External ID": employee.get("External ID"),
"Employee ID": employee.get("Employee ID"),
"Last Name": employee.get("Last Name"),
"First Name": employee.get("First Name"),
"Original Hire Date": employee.get("Original Hire Date"),
"Most Recent Hire Date": employee.get("Most Recent Hire Date"),
"Termination Date": employee.get("Termination Date"),
"Employee Type": employee.get("Employee Type"),
"Employee Status - Active?": employee.get("Employee Status - Active?"),
"Email - Primary Work": employee.get("Email - Primary Work"),
"Manager ID": employee.get("Manager ID"),
"Cost Center - ID": employee.get("Cost Center - ID"),
"Address1": employee.get("Address1"),
"Address2": employee.get("Address2"),
"State": employee.get("State"),
"City": employee.get("City"),
"Zipcode": employee.get("Zipcode"),
"Country": employee.get("Country"),
"CountryName": employee.get("CountryName"),
"Company": employee.get("Company"),
"DEFAULT CURRENCY FOR EXP. REPORT": employee.get("DEFAULT CURRENCY FOR EXP. REPORT"),
"Payment Method": employee.get("Payment Method"),
"Class": employee.get("Class", ""),
"newEmployee": employee.get("newEmployee"),
"Rehire": employee.get("Rehire"),
"InternationalTransfer": False if not employee.get("InternationalTransfer") else employee.get("InternationalTransfer"),
"oldCountryCode": employee.get("oldCountryCode"),
"oldCountryName": employee.get("oldCountryName")
}]
ret = self.ns_restlet.post_error_report(output_data)
self.error_lst = []
def update(self, wd_workers,
workers_dict,
newEmployee = False,
reHire = False,
internationalTransfer = False,
ns_workers = None,
wd_comp=None,
ns_comp=None,
operation=None,
max_limit=1
):
import time
time_stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
num_updates = 0
for i, wd_worker in enumerate(wd_workers):
ns_country = self.mapping.map_country(wd_worker.Country)
if not wd_worker.Preferred_Full_Name:
First_Name = wd_worker.First_Name
Last_Name = wd_worker.Last_Name
else:
# check if there are Chinese chars
if re.findall(r'[\u4e00-\u9fff]+', wd_worker.Preferred_Full_Name):
First_Name = wd_worker.First_Name
Last_Name = wd_worker.Last_Name
else:
First_Name = (' ').join(wd_worker.Preferred_Full_Name.split(' ')[0:-1])
Last_Name = wd_worker.Preferred_Full_Name.split(' ')[-1]
#set new contractors' product as 27
if newEmployee and wd_worker.Employee_Type in ['Elance Contractor',
'Independent Contractor',
'Vendor']:
class_ = 27 #Business Support
else:
class_ = self.get_product_class_map(wd_worker.Product)
# print(wd_comp[wd_worker.Employee_ID])
# print(ns_comp[wd_worker.Employee_ID])
employee_data = {
"employees": [
{
"External ID": wd_worker.Employee_ID,
"Employee ID": f"{wd_worker.Employee_ID}",
"Last Name": Last_Name,
"First Name": First_Name,
"Legal Name": f"{wd_worker.First_Name} {wd_worker.Last_Name}",
"Original Hire Date": wd_worker.Most_Recent_Hire_Date,
"Most Recent Hire Date": wd_worker.Most_Recent_Hire_Date,
"Termination Date": wd_worker.termination_date if not reHire else None,
"Employee Type": wd_worker.Employee_Type,
"Employee Status - Active?": 'Actively Employed' if wd_worker.Employee_Status=='1'else 'Terminated' ,
"Email - Primary Work": wd_worker.primaryWorkEmail,
"Manager ID": None if wd_worker.Manager_ID==wd_worker.Employee_ID else wd_worker.Manager_ID,
"Cost Center - ID": wd_worker.Cost_Center_ID,
#"Product": wd_worker.Product,
"Address1": wd_worker.Primary_Address,
"Address2": None,
"State": wd_worker.State if wd_worker.State else wd_worker.Province,
"City": wd_worker.City,
"Zipcode": wd_worker.Postal,
"Country": self.mapping.map_country_codes(ns_country),
"CountryName": ns_country,
"Company": self.mapping.map_company(ns_country),
"DEFAULT CURRENCY FOR EXP. REPORT": self.mapping.map_currency(ns_country),
"Payment Method": self.mapping.map_payment_method(ns_country),
"Class": class_,
"newEmployee" : newEmployee,
"Rehire" : True if reHire else None,
"InternationalTransfer" : True if internationalTransfer else None,
"oldCountryCode" : self.mapping.map_country_codes(ns_workers[wd_worker.Employee_ID].get('Country'))
if internationalTransfer else None,
"oldCountryName" : ns_workers[wd_worker.Employee_ID].get('Country')
if internationalTransfer else None,
}
]
}
try:
ret = self.ns_restlet.update(employee_data)
if len(ret)==0:
num_updates+=1
self.error_lst.append((employee_data,'success',time_stamp))
else:
self.error_lst.append((employee_data,ret,time_stamp))
self.logger.info(f"Error while updating Employee ID:{wd_worker.Employee_ID} error:{ret}")
if num_updates>=max_limit:
break
except NetSuiteRestletException as e:
self.logger.info(f"Employee ID:{wd_worker.Employee_ID} ")
self.logger.info(f"error {e.args[0].data}")
except Exception as e:
self.logger.info(f"error {e}")
continue
return
class WorkdayToNetsuiteIntegration():
"""Integration class for syncing data from Workday to Netsuite.
Args:
args (Args): Arguments for the integration.
"""
def __init__(self,) -> None:
#self.workday_service = WorkDayRaaService()
self.workday = Workday()
self.netsuite = NetSuite()
self.logger = logging.getLogger(self.__class__.__name__)
def compare_dates(self, date1: str, date2: str) -> bool:
# Convert both strings to datetime objects
date1_obj = datetime.strptime(date1, "%Y-%m-%d")
date2_obj = datetime.strptime(date2, "%Y-%m-%d")
if not date2_obj: # date2 expected to be the termination date
return True
return date1_obj > date2_obj
def transform_data(self, input_tuple):
result = []
for index, (employee_data, error_description, timestamp) in enumerate(input_tuple, start=1):
# Concatenate the timestamp with the row number
row_id = f"{timestamp}_{index}"
# Create the dictionary with required keys and values
result.append(
{'row_id': row_id,'error_description': error_description}| employee_data['employees'][0]
)
return self.convert_to_json(result)
def convert_to_json(self, data, filename="output.json"):
"""
Converts a Python data structure to JSON and saves it to a specified file.
Parameters:
- data (list): The Python data structure to convert, typically a list of dictionaries.
- filename (str): The name of the file to save the JSON data (default is "output.json").
"""
try:
with open(filename, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4, ensure_ascii=False)
print(f"Data successfully saved to {filename}")
except Exception as e:
print(f"An error occurred while saving to JSON: {e}")
def run(self, max_limit):
"""Run all the steps of the integration"""
operations = [
Operations.update_employee,
Operations.rehired,
Operations.add_new_manager,
Operations.add_new_hire,
Operations.international_transfer,
]
# ========================================================
# Step 1: Getting Workday Data
# ========================================================
try:
self.logger.info("Step 1: Getting Workday Data")
wd_workers, workers_dict, wd_comp = self.workday.get_listing_of_workers()
self.logger.info(f"Number of Worday employees {len(wd_workers)}.")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed on Step 1: Getting Workday Data")
sys.exit(1)
# ========================================================
# Step 2: Getting NetSuite Data
# ========================================================
try:
self.logger.info("Step 2: Getting NetSuite Data")
ns_workers, ns_comp = self.netsuite.get_employees()
self.logger.info(f"Number of Worday employees {len(ns_workers)}.")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed on Step 2: Getting NetSuite Data")
sys.exit(1)
# ========================================================
# Step 3: Compare Workday and Netsuite data
# ========================================================
self.logger.info("Step 3: Compare Workday and Netsuite data")
add_list, upd_list, del_list = self.netsuite.compare_users(wd_comp=wd_comp, ns_comp=ns_comp)
self.logger.critical(f"Diff list {del_list}.")
# remove terminated employees from add_list
terminated = [x.Employee_ID for x in wd_workers if x.Employee_Status=='2']
add_list = [x for x in add_list if x not in terminated]
# add terminated records to update
upd_list = upd_list + terminated
# ========================================================
# Step 4: Add rehires
# ========================================================
try:
# diff_hire_dates that are in the add_list
rehires = [x for x in wd_workers if self.compare_dates(x.Most_Recent_Hire_Date, x.Original_Hire_Date)
and x.Employee_ID in add_list and x.Employee_Status=='1']
if Operations.rehired in operations:
self.logger.critical("Step 4: Add rehires")
self.netsuite.update(wd_workers=rehires,
max_limit=max_limit,
workers_dict=workers_dict,
newEmployee=False,
reHire=True,
ns_workers=ns_workers,
wd_comp=wd_comp,
ns_comp=ns_comp,
operation=Operations.rehired
)
self.netsuite.post_error_report( "Adding rehires")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed on Step 4: Add rehires")
# ========================================================
# Step 5: Add new employees
# ========================================================
try:
# remove the rehires from the add_list
add_list = [x for x in add_list if x not in [x.Employee_ID for x in rehires]]
wd_workers_add = [x for x in wd_workers if x.Employee_ID in add_list and
x.Employee_Status =='1']
# Add managers first
wd_workers_add_managers = [x for x in wd_workers_add if x.Employee_ID
in [x.Manager_ID for x in wd_workers_add]]
if Operations.add_new_manager in operations:
self.logger.info("Step 4: Add new employees")
self.netsuite.update(wd_workers=wd_workers_add_managers,
max_limit=max_limit,
workers_dict=workers_dict,
newEmployee=True,
ns_workers=ns_workers,
operation=Operations.add_new_manager
)
self.netsuite.post_error_report("Adding new managers")
# Adding non managers
wd_workers_add = [x for x in wd_workers_add if x.Employee_ID
not in [x.Manager_ID for x in wd_workers_add]]
if Operations.add_new_hire in operations:
self.netsuite.update(wd_workers=wd_workers_add,
max_limit=max_limit,
workers_dict=workers_dict,
newEmployee=True,
ns_workers=ns_workers,
operation=Operations.add_new_hire
)
self.netsuite.post_error_report("Adding new employees")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed on Step 5: Add new employees")
# ========================================================
# Step 6: International Transfers
# ========================================================
try:
ret = self.workday.get_international_transfers(ns_workers, workers_dict)
wd_workers_upd = [x for x in wd_workers if x.Employee_ID in [x.Employee_ID for x in ret]]
if Operations.international_transfer in operations:
self.logger.info("Step 6: International Transfers")
self.netsuite.update(wd_workers=wd_workers_upd,
newEmployee=False,
max_limit=max_limit,
workers_dict=workers_dict,
internationalTransfer=True,
ns_workers=ns_workers,
wd_comp=wd_comp,
ns_comp=ns_comp,
operation=Operations.international_transfer
)
self.netsuite.post_error_report("International Transfers")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed on Step 6: International Transfers")
# ========================================================
# Step 7: Update employees
# ========================================================
try:
# compare_and_save_data(wd_workers, upd_list, terminated, wd_comp, ns_comp)
wd_workers_upd = [x for x in wd_workers if x.Employee_ID in upd_list]
# wd_workers_upd = [x for x in wd_workers_upd if x.Employee_ID == '205032']
if Operations.update_employee in operations:
self.logger.info("Step 7: Update employees")
self.netsuite.update(wd_workers=wd_workers_upd,
max_limit=max_limit,
workers_dict=workers_dict,
newEmployee=False,
ns_workers=ns_workers,
wd_comp=wd_comp,
ns_comp=ns_comp,
operation=Operations.update_employee
)
self.netsuite.post_error_report("Updating employes")
except (APIAdaptorException, Exception) as e:
self.logger.error(str(e))
self.logger.critical("Failed on Step 7: Update employees")
self.logger.info("End of Integration.")
def compare_and_save_data(wd_workers, upd_list, terminated, wd_comp, ns_comp):
data = []
wd_workers_upd = [x for x in wd_workers if x.Employee_ID in upd_list]
wd_workers_upd = [x for x in wd_workers_upd if x.Employee_ID not in terminated]
for i, wd_worker in enumerate(wd_workers_upd):
print("---------------------------------------")
print(wd_comp[wd_worker.Employee_ID])
print(ns_comp[wd_worker.Employee_ID])
# Input strings
string1 = wd_comp[wd_worker.Employee_ID]
string2 = ns_comp[wd_worker.Employee_ID]
# Split the strings by '|'
list1 = string1.split('|')
list2 = string2.split('|')
# Compare the lists
differences1 = [""]*20
differences1[0] = wd_worker.Employee_ID
differences1[1] = "workday"
differences2 = [""]*20
differences2[0] = wd_worker.Employee_ID
differences2[1] = "netsuite"
for index, (item1, item2) in enumerate(zip(list1, list2)):
if item1 != item2:
#differences.append((index, item1, item2))
differences1[index+2] = item1
differences2[index+2] = item2
data.append(differences1)
data.append(differences2)
csv_file_path = 'employees_upd.csv' # Path to save the CSV file
data_to_csv(data, csv_file_path)
def data_to_csv(data, csv_file_path):
import numpy as np
"""
Converts a data structure to a CSV file using numpy with specific column names.
Args:
data (list of lists): The data to be written to the CSV file. Each inner list is a row of data.
csv_file_path (str): Path to save the CSV file.
"""
# Define the column names dictionary
column_names_dict = {0:"external_id", 1:"Employee Type", 2:"Original Hire Date", 3:"Company",
4:"Manager ID", 5:"Cost Center", 6:"Email", 7:"First Name", 8:"Last Name",
9:"Country", 10:"Employ Status", 11:"Address 1",
12:"State", 13:"City", 14:"Zipcode", 15:"Default Currency", 16:"Payment Method",
17:"Class"}
# Extract column names from the dictionary
column_names = [column_names_dict[i] for i in range(len(column_names_dict))]
column_names = ["employee ID", "source"] + column_names
# Convert the data to a numpy array
data_array = np.array(data,dtype="object")
# Prepend the column names to the data
data_with_headers = np.vstack([column_names, data_array])
# Save the data to a CSV file
np.savetxt(csv_file_path, data_with_headers, fmt='%s', delimiter='|',encoding='utf-8')
print(f"CSV file created successfully at: {csv_file_path}")
def main(__name__, WorkdayToNetsuiteIntegration):
parser = ArgumentParser(description="Slack Channels Integration ")
parser.add_argument(
"-l",
"--level",
action="store",
help="log level (debug, info, warning, error, or critical)",
type=str,
default="info",
)
parser.add_argument(
"-f",
"--max_limit",
action="store",
type=int,
help="limit the number of changes",
default=100
)
args = None
args = parser.parse_args()
log_level = Util.set_up_logging(args.level)
logger = logging.getLogger(__name__)
logger.info("Starting...")
logger.info(f"max_limit={args.max_limit}")
WD = WorkdayToNetsuiteIntegration()
logger = logging.getLogger("main")
logger.info('Starting Workday to Netsuite Integration ...')
WD.run(args.max_limit)
if __name__ == "__main__":
main(__name__, WorkdayToNetsuiteIntegration)