def check_dag_files()

in composer_migration/src/composer_migration/lib/comparator.py [0:0]


    def check_dag_files(self: T, strategy: DAGChecker) -> None:
        problems: List[str] = []
        output = None
        if len(self.dags_list) == 0:
            logging.error(
                "No dags to check, which means no problem operators were found."
            )
            return

        # Go through each dag and run the check
        for dag_file in self.dags_list:
            # make a tuple with filename and operator for output
            output = strategy.check_for_problem(f"{self.dags_directory}/{dag_file}")
            this_dag_problem_operators = output["nodes"]
            if len(this_dag_problem_operators) > 0:
                for operator in this_dag_problem_operators:
                    problems.append((dag_file, operator))
        # get the table output info once
        output_info = strategy.table_title_text()
        output_info[
            "operators"
        ] = problems  # join list of operators to the relevant table info
        logging.debug(output_info)
        self.problem_operators.append(output_info)
        logging.debug(self.problem_operators)