in migration_toolkit/sql_generators/copy_rows/copy_rows.py [0:0]
def generate_sql(self):
source_columns = []
destination_columns = []
for (
column_name,
source_type,
) in self.source_ddl_parser.get_schema().items():
destination_type = self.destination_ddl_parser.get_schema().get(
column_name
)
if not destination_type:
raise ValueError(
"Column names must match in source and destination, but could not"
f" find column name {column_name} in destination table. Destination"
f" schema is {self.destination_ddl_parser.get_schema()}"
)
column_name = f"`{column_name}`"
destination_columns.append(column_name)
if source_type == destination_type:
logger.debug(f"Type match for column '{column_name}'")
source_columns.append(column_name)
else:
logger.debug(
f"Type mismatch for column '{column_name}': {source_type} == >"
f" {destination_type}"
)
source_columns.append(
COLUMN_SCHEMAS_TO_CAST_EXPRESSION[
ColumnSchema(source_type, destination_type)
].format(column_name=column_name)
)
sql = COPY_DATA_SQL.format(
destination_table=self.destination_ddl_parser.get_fully_qualified_table_name(),
source_columns=",\n ".join(source_columns),
destination_columns=",\n ".join(destination_columns),
source_table=self.source_ddl_parser.get_fully_qualified_table_name(),
)
logger.info(f"Generated copy rows SQL statement:\n'{sql}'")
self._write_to_file(sql)