in core/lib/payload/copy.py [0:0]
def load_data(self):
stage_start_time = time.time()
log.info("== Stage 3: Load data ==")
# Generate the column name list string for load data infile
# The column sequence is not exact the same as the original table.
# It's pk_col_names + non_pk_col_name instead
if self._pk_for_filter:
if self.old_non_pk_column_list:
column_list = self._pk_for_filter + self.old_non_pk_column_list
else:
column_list = self._pk_for_filter
elif self.old_non_pk_column_list:
column_list = self.old_non_pk_column_list
else:
# It's impossible to reach here, otherwise it means there's zero
# column in old table which MySQL doesn't support. Something is
# totally wrong if we get to this point
raise OSCError(
"OSC_INTERNAL_ERROR",
{
"msg": "Unexpected scenario. Both _pk_for_filter "
"and old_non_pk_column_list are empty"
},
)
if self.is_myrocks_table:
# Enable rocksdb bulk load before loading data
self.change_rocksdb_bulk_load(enable=True)
# Enable rocksdb explicit commit before loading data
self.change_explicit_commit(enable=True)
for suffix in range(1, self.outfile_suffix_end + 1):
self.load_chunk(column_list, suffix)
# Print out information after every 10% chunks have been loaded
# We won't show progress if the number of chunks is less than 50
if suffix % max(5, int(self.outfile_suffix_end / 10)) == 0:
log.info(
"Load progress: {}/{} chunks".format(
suffix, self.outfile_suffix_end
)
)
if self.is_myrocks_table:
# Disable rocksdb bulk load after loading data
self.change_rocksdb_bulk_load(enable=False)
# Disable rocksdb explicit commit after loading data
self.change_explicit_commit(enable=False)
self.stats["time_in_load"] = time.time() - stage_start_time