in tools/cloud-composer-migration-complexity-assessment/airflow-v1-to-v2-migration/airflow_migration/migration.py [0:0]
def migrate_files(self):
# Args for summary report
impacted_files = []
impacted_imp_files = []
impacted_imp_op_files = []
impacted_imp_op_arg_files = []
total_num_dag = 0
imp_num = 0
imp_op_num = 0
imp_op_arg_num = 0
# Args for detailed report
line_changes = []
for filepath in glob.iglob(f"{self.input_dir}/*.py", recursive=True):
total_num_dag += 1
imp_change = False
imp_op_change = False
imp_op_arg_change = False
change_count = 0
filename = Path(filepath).stem
new_file = f"{self.output_dir}/{filename}_v2.py"
# Iterate through each file and open new v2 file
with open(filepath, 'r') as f, open(new_file, 'w') as temp:
for line_number, line in enumerate(f, start=1):
# check if a comment
# add feature to identify multi-line comments and ignore
if line.startswith('#'):
temp.write(line)
continue
# check if this is an import statement
mod_name, imported_names = parse_import_statement(line)
if mod_name is not None:
for idx, rec in enumerate(imported_names):
imp_stmt = ''
if mod_name:
imp_stmt = 'from ' + mod_name + ' '
imp_stmt += 'import ' + rec
if imp_stmt in self.replacement_dict:
imp_change = True
change_count += 1
old_line = line
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[imp_stmt][1] + " , Impact = " + \
self.replacement_dict[imp_stmt][3] + '\n'
temp.write(comment)
temp.write(self.replacement_dict[imp_stmt][2] + '\n')
line_changes.append(
[filename + ".py", "Y", "Import_Change", line_number, old_line.rstrip(), self.replacement_dict[imp_stmt][2]])
else:
temp.write(imp_stmt + '\n')
else:
# extract Operator Name from the current line
matches = re.findall(self.function_regex, line)
if matches:
# Iterate over all the operator name and check if that matches with any of the rules in
# rule dict
for rec in matches:
if rec in self.replacement_dict:
old_line = line
change_count += 1
imp_change = False
imp_op_change = True
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[rec][1] + " , Impact = " + \
self.replacement_dict[rec][3] + '\n'
temp.write(comment)
line = line.replace(rec, self.replacement_dict[rec][2])
line_changes.append(
[filename + ".py", "Y", "Operator_Change", line_number, old_line.rstrip(), line.rstrip()])
# Argument Changes - Check for a rule in rules dict with Operator name+( e.g.
# BigQueryOperator(
if rec + "(" in self.replacement_dict:
old_line = line
imp_change = False
imp_op_change = False
imp_op_arg_change = True
space_count = len(line) - len(line.lstrip())
space_count += 4
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[rec + "("][1] + " , Impact = " + \
self.replacement_dict[rec + "("][3] + '\n'
temp.write(comment)
# Truncate the new line character and hold in temp variable to check if line
# ends with ")" to identify if operator call is in single line if it is single
# line operator function execute the if statement and add argument in the
# current line itself else add a new line with argument details from rule dict
truncatedline = line.strip()
if truncatedline.endswith(")"):
line = line.replace(")", "," + self.replacement_dict[rec + "("][2] + ")")
line_changes.append(
[filename + ".py", "Y", "Argument_Change", line_number, old_line, line])
print(line_changes)
else:
line = line + ' ' * space_count + self.replacement_dict[rec + "("][
2] + ",\n"
line_changes.append(
[filename + ".py", "Y", "Argument_Change", line_number, old_line.rstrip(),
line.rstrip()])
temp.write(line)
if change_count > 0:
impacted_files.append(filename + '.py')
# Append filename with an import change
if imp_change:
imp_num += 1
impacted_imp_files.append(filename + '.py')
# Append filename with an operator change
if imp_op_change:
imp_op_num += 1
impacted_imp_op_files.append(filename + '.py')
# Append filename with an argument change
if imp_op_arg_change:
imp_op_arg_num += 1
impacted_imp_op_arg_files.append(filename + '.py')
self.generate_summary_report(total_num_dag, imp_num + imp_op_num + imp_op_arg_num, imp_num, imp_op_num,
imp_op_arg_num, impacted_imp_files, impacted_imp_op_files,
impacted_imp_op_arg_files)
# Append all filenames when is a change
if self.report_generation:
self.generate_detailed_report(line_changes)