def pre_run()

in core/commands/direct.py [0:0]


    def pre_run(self):
        # Ensure all the given ddl files are readable
        for filepath in self.args.ddl_file_list:
            if not util.is_file_readable(filepath):
                raise OSCError("FAILED_TO_READ_DDL_FILE", {"filepath": filepath})
        self.payload.ddl_file_list = self.args.ddl_file_list

        # Test database connection
        log.debug("Testing database connection")
        if not self.payload.init_conn():
            raise OSCError(
                "FAILED_TO_CONNECT_DB",
                {"user": self.payload.mysql_user, "socket": self.payload.socket},
            )

        # Test whether the replication role matches
        log.debug("Verifying replication role")
        if self.args.repl_status:
            if not self.payload.check_replication_type():
                raise OSCError(
                    "REPL_ROLE_MISMATCH", {"given_role": self.payload.repl_status}
                )

        # Fetch mysql variables from server
        if not self.payload.fetch_mysql_vars():
            raise OSCError("FAILED_TO_FETCH_MYSQL_VARS")

        # Check database existance
        non_exist_dbs = self.payload.check_db_existence()
        if non_exist_dbs:
            raise OSCError("DB_NOT_EXIST", {"db_list": ", ".join(non_exist_dbs)})