def generate_sql()

in migration_toolkit/sql_generators/copy_rows/copy_rows.py [0:0]


  def generate_sql(self):
    source_columns = []
    destination_columns = []
    for (
        column_name,
        source_type,
    ) in self.source_ddl_parser.get_schema().items():
      destination_type = self.destination_ddl_parser.get_schema().get(
          column_name
      )

      if not destination_type:
        raise ValueError(
            "Column names must match in source and destination, but could not"
            f" find column name {column_name} in destination table. Destination"
            f" schema is {self.destination_ddl_parser.get_schema()}"
        )
      column_name = f"`{column_name}`"

      destination_columns.append(column_name)

      if source_type == destination_type:
        logger.debug(f"Type match for column '{column_name}'")
        source_columns.append(column_name)
      else:
        logger.debug(
            f"Type mismatch for column '{column_name}': {source_type} == >"
            f" {destination_type}"
        )
        source_columns.append(
            COLUMN_SCHEMAS_TO_CAST_EXPRESSION[
                ColumnSchema(source_type, destination_type)
            ].format(column_name=column_name)
        )

    sql = COPY_DATA_SQL.format(
        destination_table=self.destination_ddl_parser.get_fully_qualified_table_name(),
        source_columns=",\n  ".join(source_columns),
        destination_columns=",\n  ".join(destination_columns),
        source_table=self.source_ddl_parser.get_fully_qualified_table_name(),
    )
    logger.info(f"Generated copy rows SQL statement:\n'{sql}'")
    self._write_to_file(sql)