in core/lib/payload/copy.py [0:0]
def decide_pk_for_filter(self):
# If we are adding a PK, then we should use all the columns in
# old table to identify an unique row
if not all(
(self._old_table.primary_key, self._old_table.primary_key.column_list)
):
# Let's try to get an UK if possible
for idx in self._old_table.indexes:
if idx.is_unique:
log.info(
"Old table doesn't have a PK but has an UK: {}".format(idx.name)
)
self._pk_for_filter = [col.name for col in idx.column_list]
self._pk_for_filter_def = idx.column_list.copy()
self._idx_name_for_filter = idx.name
break
else:
# There's no UK either
if self.allow_new_pk:
self._pk_for_filter = [
col.name for col in self._old_table.column_list
]
self._pk_for_filter_def = self._old_table.column_list.copy()
self.is_full_table_dump = True
else:
raise OSCError("NEW_PK")
# If we have PK in existing schema, then we use current PK as an unique
# row finder
else:
# if any of the columns of the primary key is prefixed, we want to
# use full_table_dump, instead of chunking, so that it doesn't fill
# up the disk
# e.g. name below is a prefixed col in the PK (assume varchar(99))
# since we dont use full col in PK - `PRIMARY KEY(id, name(10))`
for col in self._old_table.primary_key.column_list:
if col.length:
log.info(
"Found prefixed column/s as part of the PK. "
"Will do full table dump (no chunking)."
)
self._pk_for_filter = [c.name for c in self._old_table.column_list]
self._pk_for_filter_def = self._old_table.column_list.copy()
self.is_full_table_dump = True
break
else:
self._pk_for_filter = [
col.name for col in self._old_table.primary_key.column_list
]
self._pk_for_filter_def = self._old_table.primary_key.column_list.copy()