in core/lib/payload/copy.py [0:0]
def __init__(self, *args, **kwargs):
super(CopyPayload, self).__init__(*args, **kwargs)
self._current_db = None
self._pk_for_filter = []
self._idx_name_for_filter = "PRIMARY"
self._new_table = None
self._old_table = None
self._replayed_chg_ids = util.RangeChain()
self.select_chunk_size = 0
self.bypass_replay_timeout = False
self.is_ttl_disabled_by_me = False
self.stop_before_swap = False
self.outfile_suffix_end = 0
self.last_replayed_id = 0
self.last_checksumed_id = 0
self.table_size = 0
self.session_overrides = []
self._cleanup_payload = CleanupPayload(*args, **kwargs)
self.stats = {}
self.partitions = {}
self.eta_chunks = 1
self._last_kill_timer = None
self.table_swapped = False
self.repl_status = kwargs.get("repl_status", "")
self.outfile_dir = kwargs.get("outfile_dir", "")
# By specify this option we are allowed to open a long transaction
# during full table dump and full table checksum
self.allow_new_pk = kwargs.get("allow_new_pk", False)
self.allow_drop_column = kwargs.get("allow_drop_column", False)
self.detailed_mismatch_info = kwargs.get("detailed_mismatch_info", False)
self.dump_after_checksum = kwargs.get("dump_after_checksum", False)
self.eliminate_dups = kwargs.get("eliminate_dups", False)
self.rm_partition = kwargs.get("rm_partition", False)
self.force_cleanup = kwargs.get("force_cleanup", False)
self.skip_cleanup_after_kill = kwargs.get("skip_cleanup_after_kill", False)
self.pre_load_statement = kwargs.get("pre_load_statement", "")
self.post_load_statement = kwargs.get("post_load_statement", "")
self.replay_max_attempt = kwargs.get(
"replay_max_attempt", constant.DEFAULT_REPLAY_ATTEMPT
)
self.replay_timeout = kwargs.get(
"replay_timeout", constant.REPLAY_DEFAULT_TIMEOUT
)
self.replay_batch_size = kwargs.get(
"replay_batch_size", constant.DEFAULT_BATCH_SIZE
)
self.replay_group_size = kwargs.get(
"replay_group_size", constant.DEFAULT_REPLAY_GROUP_SIZE
)
self.skip_pk_coverage_check = kwargs.get("skip_pk_coverage_check", False)
self.pk_coverage_size_threshold = kwargs.get(
"pk_coverage_size_threshold", constant.PK_COVERAGE_SIZE_THRESHOLD
)
self.skip_long_trx_check = kwargs.get("skip_long_trx_check", False)
self.ddl_file_list = kwargs.get("ddl_file_list", "")
self.free_space_reserved_percent = kwargs.get(
"free_space_reserved_percent", constant.DEFAULT_RESERVED_SPACE_PERCENT
)
self.long_trx_time = kwargs.get("long_trx_time", constant.LONG_TRX_TIME)
self.max_running_before_ddl = kwargs.get(
"max_running_before_ddl", constant.MAX_RUNNING_BEFORE_DDL
)
self.ddl_guard_attempts = kwargs.get(
"ddl_guard_attempts", constant.DDL_GUARD_ATTEMPTS
)
self.lock_max_attempts = kwargs.get(
"lock_max_attempts", constant.LOCK_MAX_ATTEMPTS
)
self.lock_max_wait_before_kill_seconds = kwargs.get(
"lock_max_wait_before_kill_seconds",
constant.LOCK_MAX_WAIT_BEFORE_KILL_SECONDS,
)
self.session_timeout = kwargs.get(
"mysql_session_timeout", constant.SESSION_TIMEOUT
)
self.idx_recreation = kwargs.get("idx_recreation", False)
self.rocksdb_bulk_load_allow_sk = kwargs.get(
"rocksdb_bulk_load_allow_sk", False
)
self.unblock_table_creation_without_pk = kwargs.get(
"unblock_table_creation_without_pk", False
)
self.rebuild = kwargs.get("rebuild", False)
self.keep_tmp_table = kwargs.get("keep_tmp_table_after_exception", False)
self.skip_checksum = kwargs.get("skip_checksum", False)
self.skip_checksum_for_modified = kwargs.get(
"skip_checksum_for_modified", False
)
self.skip_delta_checksum = kwargs.get("skip_delta_checksum", False)
self.skip_named_lock = kwargs.get("skip_named_lock", False)
self.skip_affected_rows_check = kwargs.get("skip_affected_rows_check", False)
self.where = kwargs.get("where", None)
self.session_overrides_str = kwargs.get("session_overrides", "")
self.fail_for_implicit_conv = kwargs.get("fail_for_implicit_conv", False)
self.max_wait_for_slow_query = kwargs.get(
"max_wait_for_slow_query", constant.MAX_WAIT_FOR_SLOW_QUERY
)
self.max_replay_batch_size = kwargs.get(
"max_replay_batch_size", constant.MAX_REPLAY_BATCH_SIZE
)
self.allow_unsafe_ts_bootstrap = kwargs.get("allow_unsafe_ts_bootstrap", False)
self.is_full_table_dump = False
self.replay_max_changes = kwargs.get(
"replay_max_changes", constant.MAX_REPLAY_CHANGES
)
self.use_sql_wsenv = kwargs.get("use_sql_wsenv", False)
if self.use_sql_wsenv:
# by default, wsenv requires to use big chunk
self.chunk_size = kwargs.get("chunk_size", constant.WSENV_CHUNK_BYTES)
# by default, wsenv doesn't use local disk
self.skip_disk_space_check = kwargs.get("skip_disk_space_check", True)
# skip local disk space check when using wsenv
if not self.skip_disk_space_check:
raise OSCError("SKIP_DISK_SPACE_CHECK_VALUE_INCOMPATIBLE_WSENV")
# require outfile_dir not empty
if not self.outfile_dir:
raise OSCError("OUTFILE_DIR_NOT_SPECIFIED_WSENV")
else:
self.chunk_size = kwargs.get("chunk_size", constant.CHUNK_BYTES)
self.skip_disk_space_check = kwargs.get("skip_disk_space_check", False)