in tools/cloud-composer-migration-complexity-assessment/dags/migration_assessment.py [0:0]
def full_migration_complexity(**context):
"""
Read files from GCS and compute a migration complexity
"""
# Instantiate a Google Cloud Storage client and specify required bucket and file
storage_client = storage.Client()
bucket = storage_client.get_bucket(context["templates_dict"]["gcs_bucket"])
root_path = context["templates_dict"]["gcs_root_path"]
work_estimate_blob = bucket.blob(f"{root_path}/work_estimate.json")
work_estimate_text = work_estimate_blob.download_as_text()
# -------------------------------------------------------------------------
# Retrieve ruleset of operators that can be converted automatically vs
# manually
# -------------------------------------------------------------------------
rules_blob = bucket.blob(f"{root_path}/v1-to-v2-report/rules.csv")
rules_blob.download_to_filename("rules.csv")
rules_df = pd.read_csv("rules.csv")
manuals = rules_df.loc[rules_df["Manual Intervention"] == True, "Operator"].tolist()
automated = rules_df.loc[
rules_df["Manual Intervention"] == False, "Operator"
].tolist()
# -------------------------------------------------------------------------
# using the ruleset and the original work estimate, generate metrics on how
# many dags, tasks, hours, automatic conversions, manual interventions, and
# tasks that need review.
# -------------------------------------------------------------------------
tasks = work_hours = num_dags = num_converts = num_interventions = num_reviews = 0
unique_operators = set()
unique_interventions = set()
unique_conversions = set()
unique_reviews = set()
for line in work_estimate_text.splitlines():
entry = json.loads(line)
num_dags += 1
work_hours += int(entry["work_hours_estimate"])
tasks += int(entry["tasks"])
for operator in str(entry["list_of_all_operators"]).split(","):
if operator in manuals:
num_interventions += 1
elif operator in automated:
num_converts += 1
else:
num_reviews += 1
for operator in str(entry["list_of_unique_operators"]).split(","):
unique_operators.add(operator)
if operator in manuals:
unique_interventions.add(operator)
elif operator in automated:
unique_conversions.add(operator)
else:
unique_reviews.add(operator)
# -------------------------------------------------------------------------
# Calculate the new work estimate
# -------------------------------------------------------------------------
new_work_hours = round(work_hours - (num_converts / tasks * work_hours) / 2, 2)
effort_reduction = round((1 - new_work_hours / work_hours) * 100, 2)
# -------------------------------------------------------------------------
# Build the final report HTML file
# -------------------------------------------------------------------------
upgrade_check_blob = bucket.blob(f"{root_path}/upgrade-check/results")
upgrade_check_data = upgrade_check_blob.download_as_text()
summary_blob = bucket.blob(f"{root_path}/v1-to-v2-report/Summary-Report.txt")
summary_data = summary_blob.download_as_text()
detailed_summary_blob = bucket.blob(
f"{root_path}/v1-to-v2-report/Detailed-Report.txt"
)
detailed_summary_data = detailed_summary_blob.download_as_text()
with document(title="Migration Complexity Assessment") as doc:
style(
"""
body {
font-family: math;
}
"""
)
h1("Composer Migration Complexity Assessment")
h2("Summary")
p(f"Original Total Work Hours Estimate: {str(work_hours)}")
p(f"Number of Active DAGs: {str(num_dags)}")
h3("All Tasks")
p(f"{str(tasks)} total task(s) consisting of these unique operator types:")
uo_list = ul()
for item in list(unique_operators):
uo_list += li(item)
h3("Automated Conversions")
p(
f"{str(num_converts)} task(s) automatically converted consisting of these unique operator types:"
)
c_list = ul()
for item in list(unique_conversions):
c_list += li(item)
h3("Manual Interventions")
p(
f"{str(num_interventions)} task(s) require manual intervention consisting of these unique operator types:"
)
i_list = ul()
for item in list(unique_interventions):
i_list += li(item)
h3("Need Review")
p(
f"{str(num_reviews)} task(s) need review consisting of these unique operator types:"
)
r_list = ul()
for item in unique_reviews:
r_list += li(item)
h3("New Work Estimate")
p(f"Total Work Hours: {str(new_work_hours)}")
p(f"{effort_reduction}% change in estimated work hours.")
p("formula: original_hours-(converted_tasks/total_tasks*original_hours)/2")
h2("Tooling Logs")
h3("Google PSO Airflow V1 to V2")
p(
"The objective of the tool is to automate the upgrade of DAG files from Composer-v1 (Airflow 1.x) to Composer-v2 (Airflow 2.x). This process involves handling changes in import statements, operators, and their arguments. By automating the migration, the tool saves time, ensures consistency, reduces errors, and minimizes administrative overhead associated with manual upgrades."
)
pre(str(summary_data))
pre(str(detailed_summary_data))
h3("Airflow Upgrade Check")
p(
"This shows a number of action items that you should follow before upgrading to 2.0.0 or above."
)
pre(str(upgrade_check_data))
with open("full_migration_complexity.html", "w") as html_file:
html_file.write(doc.render())
res_blob = bucket.blob(f"{root_path}/full_migration_complexity.html")
res_blob.upload_from_filename("full_migration_complexity.html")