tools/airflow-v1-to-v2-migration/airflow_migration/migration.py (188 lines of code) (raw):
import csv
import glob
from pathlib import Path
import re
from prettytable import PrettyTable
def clean_input(arg):
return arg.strip().replace('\n', '')
def parse_import_statement(import_statement):
pattern = r'^(?:from\s+([\w.]+)\s+)?import\s+(?:\(([^)]+)\)|([\w., ]+))$'
match = re.search(pattern, import_statement)
if match:
mod_name = match.group(1)
imported_names = match.group(2) or match.group(3)
if imported_names:
imported_names = re.findall(r'([\w.]+)(?:\s+as\s+[\w.]+)?', imported_names)
return mod_name, imported_names
else:
return None, None
class MigrationUtility:
def __init__(self, input_dir, output_dir, rules_file, add_comments, comments, report_generation):
self.replacement_dict = {}
self.rules_file = rules_file
self.input_dir = input_dir
self.output_dir = output_dir
self.add_comments = add_comments
self.comments = comments
self.report_generation = report_generation
# self.function_regex = r'(\w+)\('
self.function_regex = r'(\w+(?:\.\w+)*)\('
def load_rules(self):
with open(self.rules_file, 'r') as f:
reader = csv.reader(f)
for col in reader:
self.replacement_dict[col[2].strip()] = (clean_input(col[0]), clean_input(col[1]), clean_input(col[3]),
clean_input(col[4]), clean_input(col[5]), clean_input(col[6]))
# Function to generate summary report
def generate_summary_report(self, total_num_dag, total_change_num, imp_num, imp_op_num, imp_op_arg_num,
impacted_imp_files, impacted_imp_operator_files, impacted_imp_operator_arg_files):
summary_report = PrettyTable()
summary_report.title = "SUMMARY REPORT"
summary_report.field_names = ["DESCRIPTION", "INFO"]
# Set the alignment of columns
summary_report.align["DESCRIPTION"] = "l"
summary_report.align["INFO"] = "l"
# Adding count of the files
summary_report.add_row(["Total number of DAG's", total_num_dag])
summary_report.add_row(["Total number of DAG's with changes: ", total_change_num])
summary_report.add_row(["Total number of DAG's with import changes: ", imp_num])
summary_report.add_row(["Total number of DAG's with import and operator changes: ", imp_op_num])
summary_report.add_row(
["Total number of DAG's with import, operator and argument changes: ", imp_op_arg_num])
# Add blank rows to create spacing
for _ in range(2):
summary_report.add_row(["", ""])
# Adding info on the files
summary_report.add_row(["Impacted DAG's with import changes", [file for file in impacted_imp_files]])
summary_report.add_row(["_" * 20, "_" * 20])
summary_report.add_row(["Impacted DAG's with import and operator changes",
[file for file in impacted_imp_operator_files]])
summary_report.add_row(["_" * 20, "_" * 20])
summary_report.add_row(["Impacted DAG's with import, operator and argument changes",
[file for file in impacted_imp_operator_arg_files]])
# Generate the empty report and add the table to the report
summary_report_file = f"{self.output_dir}/Summary-Report.txt"
with open(summary_report_file, "w") as sum_report:
sum_report.write(str(summary_report))
# Function to generate Detailed report
def generate_detailed_report(self, line_changes):
detailed_report = PrettyTable()
detailed_report.title = "DETAILED REPORT"
detailed_report.field_names = ["DAG FILE", "AUTOMATED", "CHANGE_TYPE", "LINE_NO", "OLD_STATEMENT", "NEW_STATEMENT"]
# Set the alignment of columns
# detailed_report.align["DESCRIPTION"] = "l"
# detailed_report.align["DAG FILE"] = "l"
detailed_report.align["DAG FILE"], detailed_report.align["AUTOMATED"], detailed_report.align["CHANGE_TYPE"], detailed_report.align["LINE_NO"], \
detailed_report.align["OLD_STATEMENT"], detailed_report.align["NEW_STATEMENT"] = "l", "l", "l", "l", "l", "l"
detailed_report.max_table_width = 300
# Adding each value to the table
for row in line_changes:
detailed_report.add_row(row)
# Generate the empty report and add the table to the report
detailed_report_file = f"{self.output_dir}/Detailed-Report.txt"
with open(detailed_report_file, "w") as det_report:
det_report.write(str(detailed_report))
def migrate_files(self):
# Args for summary report
impacted_files = []
impacted_imp_files = []
impacted_imp_op_files = []
impacted_imp_op_arg_files = []
total_num_dag = 0
imp_num = 0
imp_op_num = 0
imp_op_arg_num = 0
# Args for detailed report
line_changes = []
for filepath in glob.iglob(f"{self.input_dir}/*.py", recursive=True):
total_num_dag += 1
imp_change = False
imp_op_change = False
imp_op_arg_change = False
change_count = 0
filename = Path(filepath).stem
new_file = f"{self.output_dir}/{filename}_v2.py"
# Iterate through each file and open new v2 file
with open(filepath, 'r') as f, open(new_file, 'w') as temp:
for line_number, line in enumerate(f, start=1):
# check if a comment
# add feature to identify multi-line comments and ignore
if line.startswith('#'):
temp.write(line)
continue
# check if this is an import statement
mod_name, imported_names = parse_import_statement(line)
if mod_name is not None:
for idx, rec in enumerate(imported_names):
imp_stmt = ''
if mod_name:
imp_stmt = 'from ' + mod_name + ' '
imp_stmt += 'import ' + rec
if imp_stmt in self.replacement_dict:
imp_change = True
change_count += 1
old_line = line
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[imp_stmt][1] + " , Impact = " + \
self.replacement_dict[imp_stmt][3] + '\n'
temp.write(comment)
temp.write(self.replacement_dict[imp_stmt][2] + '\n')
line_changes.append(
[filename + ".py", "Y", "Import_Change", line_number, old_line.rstrip(), self.replacement_dict[imp_stmt][2]])
else:
temp.write(imp_stmt + '\n')
else:
# extract Operator Name from the current line
matches = re.findall(self.function_regex, line)
if matches:
# Iterate over all the operator name and check if that matches with any of the rules in
# rule dict
for rec in matches:
if rec in self.replacement_dict:
old_line = line
change_count += 1
imp_change = False
imp_op_change = True
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[rec][1] + " , Impact = " + \
self.replacement_dict[rec][3] + '\n'
temp.write(comment)
line = line.replace(rec, self.replacement_dict[rec][2])
line_changes.append(
[filename + ".py", "Y", "Operator_Change", line_number, old_line.rstrip(), line.rstrip()])
# Argument Changes - Check for a rule in rules dict with Operator name+( e.g.
# BigQueryOperator(
if rec + "(" in self.replacement_dict:
old_line = line
imp_change = False
imp_op_change = False
imp_op_arg_change = True
space_count = len(line) - len(line.lstrip())
space_count += 4
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[rec + "("][1] + " , Impact = " + \
self.replacement_dict[rec + "("][3] + '\n'
temp.write(comment)
# Truncate the new line character and hold in temp variable to check if line
# ends with ")" to identify if operator call is in single line if it is single
# line operator function execute the if statement and add argument in the
# current line itself else add a new line with argument details from rule dict
truncatedline = line.strip()
if truncatedline.endswith(")"):
line = line.replace(")", "," + self.replacement_dict[rec + "("][2] + ")")
line_changes.append(
[filename + ".py", "Y", "Argument_Change", line_number, old_line, line])
print(line_changes)
else:
line = line + ' ' * space_count + self.replacement_dict[rec + "("][
2] + ",\n"
line_changes.append(
[filename + ".py", "Y", "Argument_Change", line_number, old_line.rstrip(),
line.rstrip()])
temp.write(line)
if change_count > 0:
impacted_files.append(filename + '.py')
# Append filename with an import change
if imp_change:
imp_num += 1
impacted_imp_files.append(filename + '.py')
# Append filename with an operator change
if imp_op_change:
imp_op_num += 1
impacted_imp_op_files.append(filename + '.py')
# Append filename with an argument change
if imp_op_arg_change:
imp_op_arg_num += 1
impacted_imp_op_arg_files.append(filename + '.py')
self.generate_summary_report(total_num_dag, imp_num + imp_op_num + imp_op_arg_num, imp_num, imp_op_num,
imp_op_arg_num, impacted_imp_files, impacted_imp_op_files,
impacted_imp_op_arg_files)
# Append all filenames when is a change
if self.report_generation:
self.generate_detailed_report(line_changes)
def run_migration(input_dag, output_dag, rules_file, add_comments, comments, report_generation):
migration_utility = MigrationUtility(input_dir=input_dag, output_dir=output_dag,
rules_file=rules_file, add_comments=add_comments,
comments=comments, report_generation=report_generation)
migration_utility.load_rules()
migration_utility.migrate_files()