dags/eam_workday_netsuite.py (177 lines of code) (raw):

from datetime import datetime, timedelta from airflow import DAG from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag DOCS = """ ### Workday/Netsuite integration Runs a script in docker image that syncs employee data from Workday to Netsuite. It creates a Jira ticket if the task fails. [docker-etl](https://github.com/mozilla/docker-etl/tree/main/jobs/eam-integrations) [telemetry-airflow](https://github.com/mozilla/telemetry-airflow/tree/main/dags/eam_workday_netsuite_integration.py) This DAG requires the creation of an Airflow Jira connection. #### Owner jmoscon@mozilla.com #### Tags * impact/tier_3 * repo/telemetry-airflow * triage/record_only """ def get_airflow_log_link(context): import urllib.parse dag_run_id = context["dag_run"].run_id task_id = context["task_instance"].task_id base_url = "http://workflow.telemetry.mozilla.org/dags/" base_url += "eam-workday-xmatters-integration/grid?tab=logs&dag_run_id=" return base_url + f"{urllib.parse.quote(dag_run_id)}&task_id={task_id}" def create_jira_ticket(context): import json import logging import requests from airflow.providers.atlassian.jira.hooks.jira import JiraHook from requests.auth import HTTPBasicAuth logger = logging.getLogger(__name__) logger.info("Creating Jira ticket ...") conn_id = "eam_jira_connection_id" conn = JiraHook( jira_conn_id=conn_id, ).get_connection(conn_id) log_url = get_airflow_log_link(context) jira_domain = "mozilla-hub.atlassian.net" url = f"https://{jira_domain}/rest/api/3/issue" headers = {"Accept": "application/json", "Content-Type": "application/json"} auth = HTTPBasicAuth(conn.login, conn.password) summary = "Workday Netsuite Integration - Airflow Task Issue Exception" paragraph_text = "Detailed error logging can be found in the link: " project_key = "ASP" issue_type_id = "10020" # Issue Type = Bug assignee_id = "712020:b999000a-67b1-45ff-8b40-42a5ceeee75b" # Julio payload = json.dumps( { "fields": { "assignee": {"id": assignee_id}, "project": {"key": project_key}, "summary": summary, "description": { "type": "doc", "version": 1, "content": [ { "type": "paragraph", "content": [ { "type": "text", "text": paragraph_text, }, { "type": "text", "text": "Mozilla-Telemetry log.", "marks": [ { "type": "link", "attrs": {"href": f"{log_url}"}, } ], }, ], } ], }, "issuetype": {"id": issue_type_id}, } } ) response = requests.post(url, headers=headers, auth=auth, data=payload) logger.info(f"response.text={response.text}") if response.status_code == 201: logger.info("Issue created successfully.") return response.json() else: logger.info( f"Failed to create issue. Status code:" f"{response.status_code}, Response: {response.text}" ) return None default_args = { "owner": "jmoscon@mozilla.com", "emails": ["jmoscon@mozilla.com"], "start_date": datetime(2024, 1, 1), "retries": 3, # wait 5 min before retry "retry_delay": timedelta(minutes=5), "on_failure_callback": create_jira_ticket, } tags = [Tag.ImpactTier.tier_3, Tag.Triage.record_only, Tag.Repo.airflow] NETSUITE_INTEG_WORKDAY_USERNAME = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_WORKDAY_USERNAME", secret="airflow-gke-secrets", key="NETSUITE_INTEG_WORKDAY_USERNAME", ) NETSUITE_INTEG_WORKDAY_PASSWORD = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_WORKDAY_PASSWORD", secret="airflow-gke-secrets", key="NETSUITE_INTEG_WORKDAY_PASSWORD", ) NETSUITE_INTEG_WORKDAY_LISTING_OF_WORKERS_LINK = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_WORKDAY_LISTING_OF_WORKERS_LINK", secret="airflow-gke-secrets", key="NETSUITE_INTEG_WORKDAY_LISTING_OF_WORKERS_LINK", ) NETSUITE_INTEG_WORKDAY_INTERNATIONAL_TRANSFER_LINK = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_WORKDAY_INTERNATIONAL_TRANSFER_LINK", secret="airflow-gke-secrets", key="NETSUITE_INTEG_WORKDAY_INTERNATIONAL_TRANSFER_LINK", ) NETSUITE_INTEG_NETSUITE_CONSUMER_KEY = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_NETSUITE_CONSUMER_KEY", secret="airflow-gke-secrets", key="NETSUITE_INTEG_NETSUITE_CONSUMER_KEY", ) NETSUITE_INTEG_NETSUITE_CONSUMER_SECRET = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_NETSUITE_CONSUMER_SECRET", secret="airflow-gke-secrets", key="NETSUITE_INTEG_NETSUITE_CONSUMER_SECRET", ) NETSUITE_INTEG_NETSUITE_TOKEN_ID = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_NETSUITE_TOKEN_ID", secret="airflow-gke-secrets", key="NETSUITE_INTEG_NETSUITE_TOKEN_ID", ) NETSUITE_INTEG_NETSUITE_TOKEN_SECRET = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_NETSUITE_TOKEN_SECRET", secret="airflow-gke-secrets", key="NETSUITE_INTEG_NETSUITE_TOKEN_SECRET", ) NETSUITE_INTEG_NETSUITE_TOKEN_OAUTH_REALM = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_NETSUITE_TOKEN_OAUTH_REALM", secret="airflow-gke-secrets", key="NETSUITE_INTEG_NETSUITE_TOKEN_OAUTH_REALM", ) NETSUITE_INTEG_NETSUITE_HOST = Secret( deploy_type="env", deploy_target="NETSUITE_INTEG_NETSUITE_HOST", secret="airflow-gke-secrets", key="NETSUITE_INTEG_NETSUITE_HOST", ) with DAG( "eam-workday-netsuite-integration", default_args=default_args, doc_md=DOCS, tags=tags, # 12:00 AM PST - M-F schedule_interval="0 8 * * 1-5", ) as dag: workday_netsuite_dag = GKEPodOperator( task_id="eam_workday_netsuite", arguments=["python", "scripts/workday_netsuite_integration.py", "--level", "info"], image="gcr.io/moz-fx-data-airflow-prod-88e0/" + "eam-integrations_docker_etl:latest", gcp_conn_id="google_cloud_airflow_gke", secrets=[ NETSUITE_INTEG_WORKDAY_USERNAME, NETSUITE_INTEG_WORKDAY_PASSWORD, NETSUITE_INTEG_WORKDAY_LISTING_OF_WORKERS_LINK, NETSUITE_INTEG_WORKDAY_INTERNATIONAL_TRANSFER_LINK, NETSUITE_INTEG_NETSUITE_CONSUMER_KEY, NETSUITE_INTEG_NETSUITE_CONSUMER_SECRET, NETSUITE_INTEG_NETSUITE_TOKEN_ID, NETSUITE_INTEG_NETSUITE_TOKEN_SECRET, NETSUITE_INTEG_NETSUITE_TOKEN_OAUTH_REALM, NETSUITE_INTEG_NETSUITE_HOST ], )