in core/lib/payload/copy.py [0:0]
def checksum_by_chunk(self, table_name, dump_after_checksum=False):
"""
Running checksum for all the existing data in new table. This is to
make sure there's no data corruption after load and first round of
replay
"""
checksum_result = []
# Checksum by chunk. This is pretty much the same logic as we've used
# in select_table_into_outfile
affected_rows = 1
use_where = False
outfile_id = 0
if table_name == self.new_table_name:
idx_for_checksum = self.find_coverage_index()
outfile_prefix = "{}.new".format(self.outfile)
else:
idx_for_checksum = self._idx_name_for_filter
outfile_prefix = "{}.old".format(self.outfile)
while affected_rows:
checksum = self.query(
sql.checksum_by_chunk(
table_name,
self.checksum_column_list,
self._pk_for_filter,
self.range_start_vars_array,
self.range_end_vars_array,
self.select_chunk_size,
use_where,
idx_for_checksum,
)
)
# Dump the data onto local disk for further investigation
# This will be very helpful when there's a reproducable checksum
# mismatch issue
if dump_after_checksum:
self.execute_sql(
sql.dump_current_chunk(
table_name,
self.checksum_column_list,
self._pk_for_filter,
self.range_start_vars_array,
self.select_chunk_size,
idx_for_checksum,
use_where,
),
("{}.{}".format(outfile_prefix, str(outfile_id)),),
)
outfile_id += 1
# Refresh where condition range for next select
if checksum:
self.refresh_range_start()
affected_rows = checksum[0]["cnt"]
checksum_result.append(checksum[0])
use_where = True
return checksum_result