in core/commands/direct.py [0:0]
def pre_run(self):
# Ensure all the given ddl files are readable
for filepath in self.args.ddl_file_list:
if not util.is_file_readable(filepath):
raise OSCError("FAILED_TO_READ_DDL_FILE", {"filepath": filepath})
self.payload.ddl_file_list = self.args.ddl_file_list
# Test database connection
log.debug("Testing database connection")
if not self.payload.init_conn():
raise OSCError(
"FAILED_TO_CONNECT_DB",
{"user": self.payload.mysql_user, "socket": self.payload.socket},
)
# Test whether the replication role matches
log.debug("Verifying replication role")
if self.args.repl_status:
if not self.payload.check_replication_type():
raise OSCError(
"REPL_ROLE_MISMATCH", {"given_role": self.payload.repl_status}
)
# Fetch mysql variables from server
if not self.payload.fetch_mysql_vars():
raise OSCError("FAILED_TO_FETCH_MYSQL_VARS")
# Check database existance
non_exist_dbs = self.payload.check_db_existence()
if non_exist_dbs:
raise OSCError("DB_NOT_EXIST", {"db_list": ", ".join(non_exist_dbs)})