tools/cloud-composer-migration-complexity-assessment/dags/migration_assessment.py (216 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ DAG to scope the estimated work effort for an Airflow 1 to Airflow 2 Migration. Collects airflow metadata for inventory, runs diagnostic tools, outputs all to a Google Cloud Storage location and generates a final migration assessment. """ import time import os import json from datetime import datetime, timedelta from google.cloud import storage import pandas as pd from dominate import document from dominate.tags import h1, h2, h3, pre, p, ul, li, style from airflow import models from airflow.operators.bash_operator import BashOperator from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator from airflow.operators.python_operator import PythonOperator GCS_BUCKET = "your-bucket-name" default_args = { "owner": "auditing", "depends_on_past": False, "email": [""], "email_on_failure": False, "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=2), } # ------------------------- # Callback Functions # ------------------------- def full_migration_complexity(**context): """ Read files from GCS and compute a migration complexity """ # Instantiate a Google Cloud Storage client and specify required bucket and file storage_client = storage.Client() bucket = storage_client.get_bucket(context["templates_dict"]["gcs_bucket"]) root_path = context["templates_dict"]["gcs_root_path"] work_estimate_blob = bucket.blob(f"{root_path}/work_estimate.json") work_estimate_text = work_estimate_blob.download_as_text() # ------------------------------------------------------------------------- # Retrieve ruleset of operators that can be converted automatically vs # manually # ------------------------------------------------------------------------- rules_blob = bucket.blob(f"{root_path}/v1-to-v2-report/rules.csv") rules_blob.download_to_filename("rules.csv") rules_df = pd.read_csv("rules.csv") manuals = rules_df.loc[rules_df["Manual Intervention"] == True, "Operator"].tolist() automated = rules_df.loc[ rules_df["Manual Intervention"] == False, "Operator" ].tolist() # ------------------------------------------------------------------------- # using the ruleset and the original work estimate, generate metrics on how # many dags, tasks, hours, automatic conversions, manual interventions, and # tasks that need review. # ------------------------------------------------------------------------- tasks = work_hours = num_dags = num_converts = num_interventions = num_reviews = 0 unique_operators = set() unique_interventions = set() unique_conversions = set() unique_reviews = set() for line in work_estimate_text.splitlines(): entry = json.loads(line) num_dags += 1 work_hours += int(entry["work_hours_estimate"]) tasks += int(entry["tasks"]) for operator in str(entry["list_of_all_operators"]).split(","): if operator in manuals: num_interventions += 1 elif operator in automated: num_converts += 1 else: num_reviews += 1 for operator in str(entry["list_of_unique_operators"]).split(","): unique_operators.add(operator) if operator in manuals: unique_interventions.add(operator) elif operator in automated: unique_conversions.add(operator) else: unique_reviews.add(operator) # ------------------------------------------------------------------------- # Calculate the new work estimate # ------------------------------------------------------------------------- new_work_hours = round(work_hours - (num_converts / tasks * work_hours) / 2, 2) effort_reduction = round((1 - new_work_hours / work_hours) * 100, 2) # ------------------------------------------------------------------------- # Build the final report HTML file # ------------------------------------------------------------------------- upgrade_check_blob = bucket.blob(f"{root_path}/upgrade-check/results") upgrade_check_data = upgrade_check_blob.download_as_text() summary_blob = bucket.blob(f"{root_path}/v1-to-v2-report/Summary-Report.txt") summary_data = summary_blob.download_as_text() detailed_summary_blob = bucket.blob( f"{root_path}/v1-to-v2-report/Detailed-Report.txt" ) detailed_summary_data = detailed_summary_blob.download_as_text() with document(title="Migration Complexity Assessment") as doc: style( """ body { font-family: math; } """ ) h1("Composer Migration Complexity Assessment") h2("Summary") p(f"Original Total Work Hours Estimate: {str(work_hours)}") p(f"Number of Active DAGs: {str(num_dags)}") h3("All Tasks") p(f"{str(tasks)} total task(s) consisting of these unique operator types:") uo_list = ul() for item in list(unique_operators): uo_list += li(item) h3("Automated Conversions") p( f"{str(num_converts)} task(s) automatically converted consisting of these unique operator types:" ) c_list = ul() for item in list(unique_conversions): c_list += li(item) h3("Manual Interventions") p( f"{str(num_interventions)} task(s) require manual intervention consisting of these unique operator types:" ) i_list = ul() for item in list(unique_interventions): i_list += li(item) h3("Need Review") p( f"{str(num_reviews)} task(s) need review consisting of these unique operator types:" ) r_list = ul() for item in unique_reviews: r_list += li(item) h3("New Work Estimate") p(f"Total Work Hours: {str(new_work_hours)}") p(f"{effort_reduction}% change in estimated work hours.") p("formula: original_hours-(converted_tasks/total_tasks*original_hours)/2") h2("Tooling Logs") h3("Google PSO Airflow V1 to V2") p( "The objective of the tool is to automate the upgrade of DAG files from Composer-v1 (Airflow 1.x) to Composer-v2 (Airflow 2.x). This process involves handling changes in import statements, operators, and their arguments. By automating the migration, the tool saves time, ensures consistency, reduces errors, and minimizes administrative overhead associated with manual upgrades." ) pre(str(summary_data)) pre(str(detailed_summary_data)) h3("Airflow Upgrade Check") p( "This shows a number of action items that you should follow before upgrading to 2.0.0 or above." ) pre(str(upgrade_check_data)) with open("full_migration_complexity.html", "w") as html_file: html_file.write(doc.render()) res_blob = bucket.blob(f"{root_path}/full_migration_complexity.html") res_blob.upload_from_filename("full_migration_complexity.html") # ------------------------------------------------------------------------- # Begin creating DAG # ------------------------------------------------------------------------- with models.DAG( "test_airflow_migration_assessment_v1_0", tags=["airflow_migration_assessment"], description="assess migration scope for airflow v1 to v2", is_paused_upon_creation=True, catchup=False, start_date=datetime(2023, 8, 10), dagrun_timeout=timedelta(minutes=30), max_active_runs=1, default_args=default_args, schedule_interval="0 0 * * *", # daily at 00:00 ) as dag: TIMESTAMP = time.strftime("%Y%m%d") AIRFLOW_HOME_DIR = os.environ["DAGS_FOLDER"] GCS_ROOT_PATH = f"migration-assessment/{TIMESTAMP}" # ----------------------------------- # Collect Inventory from Airflow DB # ----------------------------------- dag_query = """ SELECT *, {processed_ts} as processed_ts FROM dag WHERE is_active=1 """.format( processed_ts=TIMESTAMP ) collect_dag_inventory = MySqlToGoogleCloudStorageOperator( task_id="collect_dag_inventory", mysql_conn_id="airflow_db", provide_context=True, sql=dag_query, bucket=GCS_BUCKET, filename=f"migration-assessment/{TIMESTAMP}/inventory/dags.json", ) task_instance_query = """ SELECT DISTINCT ti.dag_id, operator, {processed_ts} as processed_ts FROM task_instance ti JOIN dag d on d.dag_id = ti.dag_id WHERE d.is_active=1 """.format( processed_ts=TIMESTAMP ) collect_task_inventory = MySqlToGoogleCloudStorageOperator( task_id="collect_task_inventory", mysql_conn_id="airflow_db", provide_context=True, sql=task_instance_query, bucket=GCS_BUCKET, filename=f"migration-assessment/{TIMESTAMP}/inventory/tasks.json", ) operator_query = """ SELECT ti.operator, count(ti.dag_id) as occurrences, {processed_ts} as processed_ts FROM (SELECT distinct dag_id, operator FROM task_instance) ti INNER JOIN dag as d ON d.dag_id = ti.dag_id WHERE d.is_active=1 GROUP BY ti.operator """.format( processed_ts=TIMESTAMP ) collect_operator_inventory = MySqlToGoogleCloudStorageOperator( task_id="collect_operator_inventory", mysql_conn_id="airflow_db", provide_context=True, sql=operator_query, bucket=GCS_BUCKET, filename=f"migration-assessment/{TIMESTAMP}/inventory/operators.json", ) # ----------------------------------- # Run Upgrade Check and Output to GCS # ----------------------------------- # update this to the recommended documentation upgrade_cmds = """ rm -rf upgrade-check mkdir -p upgrade-check airflow upgrade_check > upgrade-check/results gsutil cp -r upgrade-check gs://{bucket}/{root_path}/ """.format( bucket=GCS_BUCKET, root_path=GCS_ROOT_PATH ) run_upgrade_check = BashOperator( task_id="run_upgrade_check", bash_command=upgrade_cmds, ) # ----------------------------------- # Run Airflow v1 to v2 Migration code # ----------------------------------- v1_to_v2_cmds = """ rm -rf v1-to-v2-report mkdir -p v1-to-v2-report cp {root_dir}/airflow-v1-to-v2-migration/migration_rules/rules.csv v1-to-v2-report/rules.csv python3 {root_dir}/airflow-v1-to-v2-migration/run_mig.py --input_dag_folder={root_dir} --output_dag_folder=v1-to-v2-report --rules_file={root_dir}/airflow-v1-to-v2-migration/migration_rules/rules.csv gsutil cp -r v1-to-v2-report gs://{gcs_bucket}/{root_path}/ gsutil rm gs://{gcs_bucket}/{root_path}/v1-to-v2-report/*.py """.format( root_dir=AIRFLOW_HOME_DIR, gcs_bucket=GCS_BUCKET, root_path=GCS_ROOT_PATH ) run_airflow_v1_to_v2 = BashOperator( task_id="run_airflow_v1_to_v2", bash_command=v1_to_v2_cmds, ) # ----------------------------------- # Run Work Estimate # ----------------------------------- work_estimate_query = """ SELECT dag_id, dag_size, complexity_score, CASE WHEN complexity_score <= 100 THEN 'Simple' WHEN complexity_score > 100 AND complexity_score < 5000 THEN 'Medium' ELSE 'Complex' END as complexity, CASE WHEN complexity_score <= 100 THEN 1 WHEN complexity_score > 100 AND complexity_score < 5000 THEN 2 ELSE 4 END as work_hours_estimate, unique_operators, list_of_unique_operators, tasks, list_of_all_operators, {processed_ts} as processed_ts FROM (SELECT sd.dag_id, length(data) as dag_size, uo.unique_operators, uo.list_of_unique_operators, uo.list_of_all_operators, tasks, round((tasks * uo.unique_operators) * length(data) / 10000, 2) as complexity_score FROM serialized_dag sd JOIN ( SELECT distinct dag_id, count(distinct operator, dag_id) AS unique_operators, group_concat(distinct operator) as list_of_unique_operators, group_concat(operator) as list_of_all_operators FROM task_instance GROUP BY dag_id, execution_date) uo ON sd.dag_id = uo.dag_id JOIN ( SELECT dag_id, count(distinct dag_id, task_id) AS tasks FROM task_instance GROUP BY dag_id ) t ON sd.dag_id = t.dag_id ) c """.format( processed_ts=TIMESTAMP ) generate_work_estimate = MySqlToGoogleCloudStorageOperator( task_id="generate_work_estimate", mysql_conn_id="airflow_db", provide_context=True, sql=work_estimate_query, bucket=GCS_BUCKET, filename=f"migration-assessment/{TIMESTAMP}/work_estimate.json", ) full_migration_complexity_task = PythonOperator( task_id="full_migration_complexity", python_callable=full_migration_complexity, provide_context=True, templates_dict={"gcs_bucket": GCS_BUCKET, "gcs_root_path": GCS_ROOT_PATH}, ) [ collect_dag_inventory, collect_task_inventory, collect_operator_inventory, run_upgrade_check, run_airflow_v1_to_v2, generate_work_estimate, ] >> full_migration_complexity_task