in composer_migration/src/composer_migration/lib/comparator.py [0:0]
def check_dag_files(self: T, strategy: DAGChecker) -> None:
problems: List[str] = []
output = None
if len(self.dags_list) == 0:
logging.error(
"No dags to check, which means no problem operators were found."
)
return
# Go through each dag and run the check
for dag_file in self.dags_list:
# make a tuple with filename and operator for output
output = strategy.check_for_problem(f"{self.dags_directory}/{dag_file}")
this_dag_problem_operators = output["nodes"]
if len(this_dag_problem_operators) > 0:
for operator in this_dag_problem_operators:
problems.append((dag_file, operator))
# get the table output info once
output_info = strategy.table_title_text()
output_info[
"operators"
] = problems # join list of operators to the relevant table info
logging.debug(output_info)
self.problem_operators.append(output_info)
logging.debug(self.problem_operators)