tools/airflow-v1-to-v2-migration/misc/migration.py (141 lines of code) (raw):
# Copyright 2022 Google LLC. This software is provided as is, without warranty
# or representation for any use or purpose.
# Your use of it is subject to your agreement with Google.
import csv
import re
import os
def clean_input(arg):
return arg.strip().replace('\n', '')
def parse_import_statement(import_statement):
pattern = r'^(?:from\s+([\w.]+)\s+)?import\s+(?:\(([^)]+)\)|([\w., ]+))$'
match = re.search(pattern, import_statement)
if match:
mod_name = match.group(1)
imported_names = match.group(2) or match.group(3)
if imported_names:
imported_names = re.findall(r'([\w.]+)(?:\s+as\s+[\w.]+)?', imported_names)
return mod_name, imported_names
else:
return None, None
class MigrationUtility:
def __init__(self, input_dir, output_dir, rules_file, add_comments, comments, report_generation):
self.replacement_dict = {}
self.rules_file = rules_file
self.input_dir = input_dir
self.output_dir = output_dir
self.add_comments = add_comments
self.comments = comments
self.report_generation = report_generation
self.function_regex = r'(\w+(?:\.\w+)*)\('
def load_rules(self):
with open(self.rules_file, 'r') as f:
reader = csv.reader(f)
for col in reader:
self.replacement_dict[col[2].strip()] = (clean_input(col[0]), clean_input(col[1]), clean_input(col[3]),
clean_input(col[4]), clean_input(col[5]), clean_input(col[6]))
@staticmethod
def print_report(impacted_files):
# temp method to print report on console , will modify this in phase 2
print("#" * 40)
print(" REPORT GENERATION ")
print("#" * 40)
print()
print('Below is the list of impacted files')
print(f'Total number of impacted files is - {len(impacted_files)}')
file_string = '\n '.join(impacted_files)
print(file_string)
# Conclusion
print("#" * 40)
print(" END OF REPORT ")
print("#" * 40)
def migrate_files(self, comment_flag, comment):
impacted_files = []
change_count = 0
inp_len = len(self.input_dir)
for root,dirs,files in os.walk(self.input_dir):
for file in files:
if file.endswith(".py"):
filepath = root + "/"+file
new_dir = self.output_dir+root[inp_len:]
isExist = os.path.exists(new_dir)
if not isExist:
os.makedirs(new_dir)
new_file = new_dir+"/"+file
change_count = 0
with open(filepath, 'r') as f, open(new_file, 'w') as temp:
for line in f:
# check if a comment
# add feature to identify multi-line comments and ignore
if line.startswith('#'):
temp.write(line)
continue
# check if this is an import statement
mod_name, imported_names = parse_import_statement(line)
if mod_name is not None:
tmpLine = ''
for idx, rec in enumerate(imported_names):
imp_stmt = ''
if mod_name:
imp_stmt = 'from ' + mod_name + ' '
imp_stmt += 'import ' + rec
if imp_stmt in self.replacement_dict:
change_count += 1
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[imp_stmt][1] + " , Impact = " + \
self.replacement_dict[imp_stmt][3] + '\n'
temp.write(comment)
tmpLine = tmpLine + self.replacement_dict[imp_stmt][2] + '\n'
else:
tmpLine = line
line = tmpLine
else:
# extract function call
matches = re.findall(self.function_regex, line)
if matches:
# search if required to replace
for rec in matches:
if rec in self.replacement_dict:
change_count += 1
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[rec][1] + " , Impact = " + \
self.replacement_dict[rec][3] + '\n'
temp.write(comment)
line = line.replace(rec, self.replacement_dict[rec][2])
if rec+"(" in self.replacement_dict or rec+" (" in self.replacement_dict:
change_count +=1
space_count = len(line) - len(line.lstrip())
spaces=''
for i in range(space_count+4) :
spaces = ' '+spaces
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
self.replacement_dict[rec+"("][1] + " , Impact = " + \
self.replacement_dict[rec+"("][3] + '\n'
temp.write(comment)
line = line+spaces +self.replacement_dict[rec+"("][2]+",\n"
for word, replacement in self.replacement_dict.items():
if replacement[1] == "Argument Replace" and replacement[4] == "TRUE" and replacement[5] == "FALSE":
if word in line:
change_count +=1
if self.add_comments:
if self.comments:
comment = '# ' + self.comments + '\n'
else:
comment = '# Migration Utility Generated Comment -- Change Type = ' + \
replacement[1] + " , Impact = " + \
replacement[3] + '\n'
temp.write(comment)
line = line.replace(word,replacement[2])
break
temp.write(line)
if change_count > 0:
impacted_files.append(file)
if self.report_generation:
self.print_report(impacted_files)
def run_migration(input_dag, output_dag, rules_file, add_comments, comments, report_generation):
migration_utility = MigrationUtility(input_dir=input_dag, output_dir=output_dag,
rules_file=rules_file, add_comments=add_comments,
comments=comments, report_generation= report_generation)
migration_utility.load_rules()
migration_utility.migrate_files(add_comments, comments)