sources/mysql/mysqldump.go (701 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/schema"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/opcode"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
)
var valuesRegexp = regexp.MustCompile("\\((.*?)\\)")
var insertRegexp = regexp.MustCompile("INSERT\\sINTO\\s(.*?)\\sVALUES\\s")
var unsupportedRegexp = regexp.MustCompile("function|procedure|trigger")
var dbcollationRegex = regexp.MustCompile("_[_A-Za-z0-9]+('([^']*)')")
// MysqlSpatialDataTypes is an array of all MySQL spatial data types.
var MysqlSpatialDataTypes = []string{"geometrycollection", "multipoint", "multilinestring", "multipolygon", "point", "linestring", "polygon", "geometry"}
var spatialRegexps = func() []*regexp.Regexp {
l := make([]*regexp.Regexp, len(MysqlSpatialDataTypes))
for i, spatial := range MysqlSpatialDataTypes {
l[i] = regexp.MustCompile("(?i)" + " " + spatial)
}
return l
}()
var spatialIndexRegex = regexp.MustCompile("(?i)\\sSPATIAL\\s")
var spatialSridRegex = regexp.MustCompile("(?i)\\sSRID\\s\\d*")
// DbDumpImpl MySQL specific implementation for DdlDumpImpl.
type DbDumpImpl struct {
}
// GetToDdl function below implement the common.DbDump interface.
func (ddi DbDumpImpl) GetToDdl() common.ToDdl {
return ToDdlImpl{}
}
// ProcessDump processes the mysql dump.
func (ddi DbDumpImpl) ProcessDump(conv *internal.Conv, r *internal.Reader) error {
return processMySQLDump(conv, r)
}
// ProcessMySQLDump reads mysqldump data from r and does schema or data conversion,
// depending on whether conv is configured for schema mode or data mode.
// In schema mode, ProcessMySQLDump incrementally builds a schema (updating conv).
// In data mode, ProcessMySQLDump uses this schema to convert MySQL data
// and writes it to Spanner, using the data sink specified in conv.
func processMySQLDump(conv *internal.Conv, r *internal.Reader) error {
for {
startLine := r.LineNumber
startOffset := r.Offset
b, stmts, err := readAndParseChunk(conv, r)
if err != nil {
return err
}
for _, stmt := range stmts {
isInsert := processStatement(conv, stmt)
internal.VerbosePrintf("Parsed SQL command at line=%d/fpos=%d: %d stmts (%d lines, %d bytes) Insert Statement=%v\n", startLine, startOffset, 1, r.LineNumber-startLine, len(b), isInsert)
logger.Log.Debug(fmt.Sprintf("Parsed SQL command at line=%d/fpos=%d: %d stmts (%d lines, %d bytes) Insert Statement=%v\n", startLine, startOffset, 1, r.LineNumber-startLine, len(b), isInsert))
}
if r.EOF {
break
}
}
internal.ResolveForeignKeyIds(conv.SrcSchema)
return nil
}
// readAndParseChunk parses a chunk of mysqldump data, returning the bytes read,
// the parsed AST (nil if nothing read), error and whether we've hit end-of-file.
// In effect, we proceed through the file, statement by statement. Many
// statements (e.g. DDL statements) are small, but insert statements can
// be large. Fortunately mysqldump limits the size of insert statements
// (default is 24MB, but configurable via --max-allowed-packet), and so
// the chunks of file we read/parse are manageable, even for mysqldump
// files containing tens or hundreds of GB of data.
func readAndParseChunk(conv *internal.Conv, r *internal.Reader) ([]byte, []ast.StmtNode, error) {
var l [][]byte
// Regex for ignoring strings of the form /*!50717 SELECT COUNT(*) INTO @rocksdb_has_p_s_session_variables FROM INFORMATION_SCHEMA.TABLES */;
// These system generated SQL statements are currently not supported by parser and return error.
// Pingcap Issue : https://github.com/pingcap/parser/issues/1370
regexExp := regexp.MustCompile(`^(\/\*[!0-9\s]*SELECT[^\n]*INTO[\s]+@[^\n]*\*\/;\n)$`)
for {
b := r.ReadLine()
l = append(l, b)
// If we see a semicolon or eof, we're likely to have a command, so try to parse it.
// Note: we could just parse every iteration, but that would mean more attempts at parsing.
if strings.Contains(string(b), ";") || r.EOF {
n := 0
for i := range l {
n += len(l[i])
}
s := make([]byte, n)
n = 0
for i := range l {
n += copy(s[n:], l[i])
}
chunk := string(s)
matchStatus := regexExp.Match([]byte(chunk))
if matchStatus {
fmt.Printf("\nParsing skipped for: %s\n", chunk)
return s, nil, nil
}
tree, _, err := parser.New().Parse(chunk, "", "")
if err == nil {
return s, tree, nil
}
newTree, ok := handleParseError(conv, chunk, err, l)
if ok {
return s, newTree, nil
}
// Likely causes of failing to parse:
// a) complex statements with embedded semicolons e.g. 'CREATE FUNCTION'
// b) a semicolon embedded in a multi-line comment, or
// c) a semicolon embedded a string constant or column/table name.
// We deal with this case by reading another line and trying again.
conv.Stats.Reparsed++
}
if r.EOF {
return nil, nil, fmt.Errorf("Error parsing last %d line(s) of input", len(l))
}
}
}
// processStatement extracts schema information from MySQL
// statements, updating Conv with new schema information, and returning
// true if INSERT statement is encountered.
func processStatement(conv *internal.Conv, stmt ast.StmtNode) bool {
switch s := stmt.(type) {
case *ast.CreateTableStmt:
if conv.SchemaMode() {
processCreateTable(conv, s)
}
case *ast.AlterTableStmt:
if conv.SchemaMode() {
processAlterTable(conv, s)
}
case *ast.SetStmt:
if conv.SchemaMode() {
processSetStmt(conv, s)
}
case *ast.InsertStmt:
processInsertStmt(conv, s)
return true
case *ast.CreateIndexStmt:
if conv.SchemaMode() {
processCreateIndex(conv, s)
}
default:
conv.SkipStatement(NodeType(stmt))
}
return false
}
func processCreateIndex(conv *internal.Conv, stmt *ast.CreateIndexStmt) {
if stmt.Table == nil {
logStmtError(conv, stmt, fmt.Errorf("cannot process index statement with nil table"))
return
}
tableName, err := getTableName(stmt.Table)
if err != nil {
logStmtError(conv, stmt, fmt.Errorf("can't get table name: %w", err))
return
}
if tbl, ok := internal.GetSrcTableByName(conv.SrcSchema, tableName); ok {
ctable := conv.SrcSchema[tbl.Id]
ctable.Indexes = append(ctable.Indexes, schema.Index{
Id: internal.GenerateIndexesId(),
Name: stmt.IndexName,
Unique: (stmt.KeyType == ast.IndexKeyTypeUnique),
Keys: toSchemaKeys(stmt.IndexPartSpecifications, tbl.ColNameIdMap),
})
conv.SrcSchema[tbl.Id] = ctable
} else {
conv.Unexpected(fmt.Sprintf("Table %s not found while processing index statement", tableName))
conv.SkipStatement(NodeType(stmt))
}
}
func processSetStmt(conv *internal.Conv, stmt *ast.SetStmt) {
if stmt.Variables != nil && len(stmt.Variables) > 0 {
for _, variable := range stmt.Variables {
if variable.Name == "TIME_ZONE" {
value := variable.Value
switch val := value.(type) {
case *driver.ValueExpr:
if val.GetValue() == nil {
logStmtError(conv, stmt, fmt.Errorf("found nil value in 'SET TIME_ZONE' statement"))
return
}
conv.TimezoneOffset = fmt.Sprintf("%v", val.GetValue())
default:
// mysqldump saves the value of TIME_ZONE (in OLD_TIME_ZONE) at
// the start of the dump, changes TIME_ZONE, dumps table schema
// and data, and then restores TIME_ZONE using OLD_TIME_ZONE at the
// end of the dump file. We track the setting of TIME_ZONE, but
// ignore the restore statements.
return
}
}
}
}
}
func processCreateTable(conv *internal.Conv, stmt *ast.CreateTableStmt) {
if stmt.Table == nil {
logStmtError(conv, stmt, fmt.Errorf("table is nil"))
return
}
tableId := internal.GenerateTableId()
tableName, err := getTableName(stmt.Table)
internal.VerbosePrintf("processing create table elem=%s stmt=%v\n", tableName, stmt)
logger.Log.Debug(fmt.Sprintf("processing create table elem=%s stmt=%v\n", tableName, stmt))
if err != nil {
logStmtError(conv, stmt, fmt.Errorf("can't get table name: %w", err))
return
}
var colIds []string
colDef := make(map[string]schema.Column)
colNameIdMap := make(map[string]string)
var keys []schema.Key
var fkeys []schema.ForeignKey
var index []schema.Index
checkConstraints := getCheckConstraints(stmt.Constraints)
for _, element := range stmt.Cols {
_, col, constraint, err := processColumn(conv, tableName, element)
if err != nil {
logStmtError(conv, stmt, err)
return
}
col.Id = internal.GenerateColumnId() //assigns new id
colDef[col.Id] = col
colIds = append(colIds, col.Id)
colNameIdMap[col.Name] = col.Id
if constraint.isPk {
keys = append(keys, schema.Key{ColId: col.Id})
}
if constraint.fk.ColumnNames != nil {
fkeys = append(fkeys, constraint.fk)
}
if constraint.isUniqueKey {
// Convert unique column constraint in MySQL to a corresponding unique index in Spanner since
// Spanner doesn't support unique constraints on columns.
// TODO: Avoid Spanner-specific schema transformations in this file -- they should only
// appear in toddl.go. This file should focus on generic transformation from source
// database schemas into schema.go.
idxId := internal.GenerateIndexesId()
index = append(index, schema.Index{
Name: "",
Id: idxId,
Unique: true,
Keys: []schema.Key{
{
ColId: col.Id,
Desc: false,
},
},
})
}
}
conv.SchemaStatement(NodeType(stmt))
conv.SrcSchema[tableId] = schema.Table{
Id: tableId,
Name: tableName,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDef,
PrimaryKeys: keys,
ForeignKeys: fkeys,
Indexes: index,
CheckConstraints: checkConstraints,
}
for _, constraint := range stmt.Constraints {
processConstraint(conv, tableId, constraint, "CREATE TABLE", conv.SrcSchema[tableId].ColNameIdMap)
}
}
func processConstraint(conv *internal.Conv, tableId string, constraint *ast.Constraint, stmtType string, colNameToIdMap map[string]string) {
st := conv.SrcSchema[tableId]
switch ct := constraint.Tp; ct {
case ast.ConstraintPrimaryKey:
checkEmpty(conv, st.PrimaryKeys, stmtType) // Drop any previous primary keys.
st.PrimaryKeys = toSchemaKeys(constraint.Keys, colNameToIdMap)
// In Spanner, primary key columns are usually annotated with NOT NULL,
// but this can be omitted to allow NULL values in key columns.
// In MySQL, the primary key constraint is a combination of
// NOT NULL and UNIQUE i.e. primary keys must be NOT NULL and UNIQUE.
// We preserve MySQL semantics and enforce NOT NULL and UNIQUE.
updateCols(conv, ast.ConstraintPrimaryKey, constraint.Keys, st.ColDefs, colNameToIdMap)
case ast.ConstraintForeignKey:
st.ForeignKeys = append(st.ForeignKeys, toForeignKeys(conv, constraint))
case ast.ConstraintIndex:
idxId := internal.GenerateIndexesId()
st.Indexes = append(st.Indexes, schema.Index{Name: constraint.Name, Id: idxId, Keys: toSchemaKeys(constraint.Keys, colNameToIdMap)})
case ast.ConstraintUniq:
idxId := internal.GenerateIndexesId()
// Convert unique column constraint in mysql to a corresponding unique index in schema
// Note that schema represents all unique constraints as indexes.
st.Indexes = append(st.Indexes, schema.Index{Name: constraint.Name, Id: idxId, Unique: true, Keys: toSchemaKeys(constraint.Keys, colNameToIdMap)})
default:
updateCols(conv, ct, constraint.Keys, st.ColDefs, colNameToIdMap)
}
conv.SrcSchema[tableId] = st
}
// method to get check constraints using tiDB parser
func getCheckConstraints(constraints []*ast.Constraint) (checkConstraints []schema.CheckConstraint) {
for _, constraint := range constraints {
if constraint.Tp == ast.ConstraintCheck {
exp := expressionToString(constraint.Expr)
exp = dbcollationRegex.ReplaceAllString(exp, "$1")
exp = checkAndAddParentheses(exp)
checkConstraint := schema.CheckConstraint{
Name: constraint.Name,
Expr: exp,
ExprId: internal.GenerateExpressionId(),
Id: internal.GenerateCheckConstrainstId(),
}
checkConstraints = append(checkConstraints, checkConstraint)
}
}
return checkConstraints
}
// converts an AST expression node to its string representation.
func expressionToString(expr ast.Node) string {
var sb strings.Builder
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, &sb)
if err := expr.Restore(restoreCtx); err != nil {
fmt.Errorf("Error restoring expression: %v\n", err)
return ""
}
return sb.String()
}
// toSchemaKeys converts a string list of MySQL keys to schema keys.
// Note that we map all MySQL keys to ascending ordered schema keys.
// For primary keys: this is fine because MySQL primary keys are always ascending.
// However, for non-primary keys (aka indexes) this is incorrect: we are dropping
// the MySQL key order specification, as mysqldump parser is not able to parse the
// order. Check this for more details:
// https://github.com/GoogleCloudPlatform/spanner-migration-tool/issues/96
// TODO: Resolve ordering issue for non-primary keys.
func toSchemaKeys(columns []*ast.IndexPartSpecification, colNameToIdMap map[string]string) (keys []schema.Key) {
for _, spec := range columns {
specColName := spec.Column.OrigColName()
if colId, ok := colNameToIdMap[specColName]; ok {
keys = append(keys, schema.Key{ColId: colId})
}
}
return keys
}
// toForeignKeys converts a MySQL ast foreign key constraint to
// schema foreign keys.
func toForeignKeys(conv *internal.Conv, fk *ast.Constraint) (fkey schema.ForeignKey) {
columns := fk.Keys
referTable, err := getTableName(fk.Refer.Table)
if err != nil {
conv.Unexpected(err.Error())
return schema.ForeignKey{}
}
referColumns := fk.Refer.IndexPartSpecifications
var colNames, referColNames []string
for i, column := range columns {
colNames = append(colNames, column.Column.Name.String())
referColNames = append(referColNames, referColumns[i].Column.Name.String())
}
onDelete := fk.Refer.OnDelete.ReferOpt.String()
onUpdate := fk.Refer.OnUpdate.ReferOpt.String()
if onDelete == "" {
onDelete = constants.FK_NO_ACTION
}
if onUpdate == "" {
onUpdate = constants.FK_NO_ACTION
}
fkey = schema.ForeignKey{
Id: internal.GenerateForeignkeyId(),
Name: fk.Name,
ColumnNames: colNames,
ReferTableName: referTable,
ReferColumnNames: referColNames,
OnDelete: onDelete,
OnUpdate: onUpdate}
return fkey
}
func updateCols(conv *internal.Conv, ct ast.ConstraintType, colNames []*ast.IndexPartSpecification, colDef map[string]schema.Column, colNameToIdMap map[string]string) {
for _, column := range colNames {
colName := column.Column.OrigColName()
cid := colNameToIdMap[colName]
cd := colDef[cid]
switch ct {
case ast.ConstraintCheck:
cd.Ignored.Check = true
case ast.ConstraintPrimaryKey:
cd.NotNull = true
}
colDef[cid] = cd
}
}
func processAlterTable(conv *internal.Conv, stmt *ast.AlterTableStmt) {
if stmt.Table == nil {
logStmtError(conv, stmt, fmt.Errorf("table is nil"))
return
}
tableName, err := getTableName(stmt.Table)
if err != nil {
logStmtError(conv, stmt, fmt.Errorf("can't get table name: %w", err))
return
}
if tbl, ok := internal.GetSrcTableByName(conv.SrcSchema, tableName); ok {
for _, item := range stmt.Specs {
switch alterType := item.Tp; alterType {
case ast.AlterTableAddConstraint:
processConstraint(conv, tbl.Id, item.Constraint, "ALTER TABLE", tbl.ColNameIdMap)
conv.SchemaStatement(NodeType(stmt))
case ast.AlterTableModifyColumn:
colname, col, constraint, err := processColumn(conv, tableName, item.NewColumns[0])
if err != nil {
logStmtError(conv, stmt, err)
return
}
col.Id = tbl.ColNameIdMap[colname]
conv.SrcSchema[tbl.Id].ColDefs[col.Id] = col
if constraint.isPk {
ctable := conv.SrcSchema[tbl.Id]
checkEmpty(conv, ctable.PrimaryKeys, "ALTER TABLE")
ctable.PrimaryKeys = []schema.Key{{ColId: col.Id}}
conv.SrcSchema[tbl.Id] = ctable
}
if constraint.fk.ColIds != nil {
ctable := conv.SrcSchema[tbl.Id]
ctable.ForeignKeys = append(ctable.ForeignKeys, constraint.fk)
conv.SrcSchema[tbl.Id] = ctable
}
if constraint.isUniqueKey {
// Convert unique column constraint in mysql to a corresponding unique index in schema
// Note that schema represents all unique constraints as indexes.
ctable := conv.SrcSchema[tbl.Id]
ctable.Indexes = append(ctable.Indexes, schema.Index{Name: "", Unique: true, Keys: []schema.Key{schema.Key{ColId: colname, Desc: false}}})
conv.SrcSchema[tbl.Id] = ctable
}
conv.SchemaStatement(NodeType(stmt))
default:
conv.SkipStatement(NodeType(stmt))
}
}
} else {
conv.SkipStatement(NodeType(stmt))
}
}
// getTableName extracts the table name from *ast.TableName table, and returns
// the raw extracted name (the MySQL table name).
// *ast.TableName is used to represent table names. It consists of two components:
//
// Schema: schemas in MySQL db often unspecified;
// Name: name of the table
//
// We build a table name from these components as follows:
// a) nil components are dropped.
// b) if more than one component is specified, they are joined using "."
//
// (Note that Spanner doesn't allow "." in table names, so this
// will eventually get re-mapped when we construct the Spanner table name).
//
// c) return error if Table is nil or "".
func getTableName(table *ast.TableName) (string, error) {
var l []string
if table.Schema.String() != "" {
l = append(l, table.Schema.String())
}
if table.Name.String() == "" {
return "", fmt.Errorf("tablename is empty: can't build table name")
}
l = append(l, table.Name.String())
return strings.Join(l, "."), nil
}
func processColumn(conv *internal.Conv, tableName string, col *ast.ColumnDef) (string, schema.Column, columnConstraint, error) {
if col.Name == nil {
return "", schema.Column{}, columnConstraint{}, fmt.Errorf("column name is nil")
}
name := col.Name.OrigColName()
if col.Tp == nil {
return "", schema.Column{}, columnConstraint{}, fmt.Errorf("can't get column type for %s: %w", name, fmt.Errorf("found nil *ast.ColumnDef.Tp"))
}
tid, mods := getTypeModsAndID(conv, col.Tp.String())
ty := schema.Type{
Name: tid,
Mods: mods,
ArrayBounds: getArrayBounds(col.Tp.String(), col.Tp.GetElems())}
column := schema.Column{Name: name, Type: ty}
return name, column, updateColsByOption(conv, tableName, col, &column), nil
}
type columnConstraint struct {
isPk bool
isUniqueKey bool
fk schema.ForeignKey
}
// updateColsByOption is specifially for ColDef constraints.
// ColumnOption type is used for parsing column constraint info from MySQL.
func updateColsByOption(conv *internal.Conv, tableName string, col *ast.ColumnDef, column *schema.Column) columnConstraint {
var cc columnConstraint
for _, elem := range col.Options {
switch op := elem.Tp; op {
case ast.ColumnOptionPrimaryKey:
column.NotNull = true
// If primary key is defined in a column then `isPk` will be true
// and this column will be added in colDef as primary keys.
cc.isPk = true
case ast.ColumnOptionNotNull:
column.NotNull = true
case ast.ColumnOptionAutoIncrement:
column.Ignored.AutoIncrement = true
case ast.ColumnOptionDefaultValue:
// If a data type specification includes no explicit DEFAULT
// value, MySQL determines if the column can take NULL as a value
// and the column is defined with DEFAULT NULL clause in mysqldump.
// This case is ignored from issue reporting of 'Default' value.
v, ok := elem.Expr.(*driver.ValueExpr)
nullDefault := ok && v.GetValue() == nil
if !nullDefault {
column.Ignored.Default = true
}
case ast.ColumnOptionUniqKey:
cc.isUniqueKey = true
case ast.ColumnOptionCheck:
column.Ignored.Check = true
case ast.ColumnOptionReference:
column := col.Name.String()
referTable, err := getTableName(elem.Refer.Table)
if err != nil {
conv.Unexpected(err.Error())
continue
}
referColumn := elem.Refer.IndexPartSpecifications[0].Column.Name.String()
// Note that foreign key constraints that are part of a column definition
// have no name, so we leave fkey.Name as the empty string.
fkey := schema.ForeignKey{
ColumnNames: []string{column},
ReferTableName: referTable,
ReferColumnNames: []string{referColumn},
OnDelete: elem.Refer.OnDelete.ReferOpt.String(),
OnUpdate: elem.Refer.OnUpdate.ReferOpt.String()}
cc.fk = fkey
}
}
return cc
}
// getTypeModsAndID returns ID and mods of column datatype.
func getTypeModsAndID(conv *internal.Conv, columnType string) (string, []int64) {
// There are no methods in pincap parser to retirieve ID and mods.
// We will process columnType eg:'varchar(40)' and split ID from the string.
// We retrieve mods using regex expression and convert it to INT64.
id := columnType
var mods []int64
if strings.Contains(columnType, "(") {
id = strings.Split(columnType, "(")[0]
// For 'set' and 'enum' datatypes, values provided are not maxLength.
if id == "set" || id == "enum" {
return id, nil
}
values := valuesRegexp.FindString(columnType)
strMods := strings.Split(values[1:len(values)-1], ",")
for _, i := range strMods {
j, err := strconv.ParseInt(i, 10, 64)
if err != nil {
conv.Unexpected(fmt.Sprintf("Unable to get modifiers for `%s` datatype.", id))
return id, nil
}
mods = append(mods, j)
}
}
// 'BINARY' keyword suffix will be added to all blob datatypes by parser.
// Eg: mediumblob BINARY. It needs to be trimmed to retrieve ID.
if strings.Contains(id, " ") {
id = strings.TrimSuffix(columnType, " BINARY")
}
return id, mods
}
// handleParseError handles error while parsing mysqldump
// statements and attempts at creating parsable chunk.
// Error can be due to insert statement, unsupported Spatial
// datatypes in create statement or unsupported stored programs.
func handleParseError(conv *internal.Conv, chunk string, err error, l [][]byte) ([]ast.StmtNode, bool) {
// Check error for statements that are not supported by Pingcap parser
// such as delimiter, function, procedures and triggers.
// If the error is due to a delimiter, we reparse till the chunk
// contains 2 delimiters, which is the typical way delimiters
// are used by mysqldump e.g.
// DELIMITER ;; - First one redefines delimiter to something unusual.
// - What follows is the definition of a function, procedure
// - or trigger, which can freely use ';' in its body.
// DELIMITER ; - Second one restores default delimiter.
// We also handle the case of functions, procedures or triggers
// without a delimiter statement.
errMsg := strings.ToLower(err.Error())
if unsupportedRegexp.MatchString(errMsg) || strings.Contains(errMsg, "delimiter") {
if strings.Count(strings.ToLower(chunk), "delimiter") == 1 {
return nil, false
}
return nil, skipUnsupported(conv, strings.ToLower(chunk))
}
// Check if error is due to Insert statement.
insertStmtPrefix := insertRegexp.FindString(chunk)
if insertStmtPrefix != "" {
// Sending chunk as list of values and insertStmtPrefix separately
// to avoid column names being treated as values by valuesRegexp.
// Eg : INSERT INTO mytable (a, b c) VALUES (1, 2, 3),(4, 5, 6);
// insertStmtPrefix = INSERT INTO mytable (a, b c) VALUES
// valuesChunk = (1, 2, 3),(4, 5, 6);
valuesChunk := insertRegexp.Split(chunk, 2)[1] // stripping off insertStmtPrefix
return handleInsertStatement(conv, valuesChunk, insertStmtPrefix)
}
// Handle error if it is due to spatial datatype as it is not supported by Pingcap parser.
for _, spatial := range MysqlSpatialDataTypes {
if strings.Contains(errMsg, `near "`+spatial) {
if conv.SchemaMode() {
conv.Unexpected(fmt.Sprintf("Unsupported datatype '%s' encountered while parsing following statement at line number %d : \n%s", spatial, len(l), chunk))
internal.VerbosePrintf("Converting datatype '%s' to 'Text' and retrying to parse the statement\n", spatial)
logger.Log.Debug(fmt.Sprintf("Converting datatype '%s' to 'Text' and retrying to parse the statement\n", spatial))
}
return handleSpatialDatatype(conv, chunk, l)
}
}
return nil, false
}
// handleInsertStatement handles error in parsing the insert statement.
// Likely causes of failing to parse Insert statement:
//
// a) Due to some invalid value.
// b) chunk size is more than what pingcap parser could handle (more than 40MB in size).
//
// We deal with this cases by extracting all rows and creating
// extended insert statements. Then we parse one Insert statement
// at a time, ensuring no size issue and skipping only invalid entries.
func handleInsertStatement(conv *internal.Conv, chunk, insertStmtPrefix string) ([]ast.StmtNode, bool) {
var stmts []ast.StmtNode
values := valuesRegexp.FindAllString(chunk, -1)
if len(values) == 0 {
return nil, false
}
for _, value := range values {
chunk = insertStmtPrefix + value + ";"
newTree, _, err := parser.New().Parse(chunk, "", "")
if err != nil {
if conv.SchemaMode() {
conv.Unexpected(fmt.Sprintf("Either unsupported value is encountered or syntax is incorrect for following statement : \n%s", chunk))
}
conv.SkipStatement("InsertStmt")
continue
}
stmts = append(stmts, newTree[0])
}
return stmts, true
}
// handleSpatialDatatype handles error in parsing spatial datatype.
// We parse chunk again after taking these actions:
// a) Replace spatial datatype with 'text'.
// b) Remove 'SPATIAL' keyword from Index/Key.
// c) Remove SRID(spatial reference identifier) attribute.
func handleSpatialDatatype(conv *internal.Conv, chunk string, l [][]byte) ([]ast.StmtNode, bool) {
if !conv.SchemaMode() {
return nil, true
}
for _, spatialRegexp := range spatialRegexps {
chunk = spatialRegexp.ReplaceAllString(chunk, " text")
}
chunk = spatialIndexRegex.ReplaceAllString(chunk, "")
chunk = spatialSridRegex.ReplaceAllString(chunk, "")
newTree, _, err := parser.New().Parse(chunk, "", "")
if err != nil {
return nil, false
}
return newTree, true
}
// skipUnsupported skips the stored programs that are not supported
// by pingcap parser.
func skipUnsupported(conv *internal.Conv, chunk string) bool {
createOrdrop := "Create"
if strings.Contains(chunk, "drop") {
createOrdrop = "Drop"
}
switch {
case strings.Contains(chunk, "trigger"):
conv.SkipStatement(createOrdrop + "TrigStmt")
case strings.Contains(chunk, "procedure"):
conv.SkipStatement(createOrdrop + "ProcedureStmt")
case strings.Contains(chunk, "function"):
conv.SkipStatement(createOrdrop + "FunctionStmt")
default:
return false
}
return true
}
// getArrayBounds calculate array bound for only set data type
// and we do not expect multidimensional array.
func getArrayBounds(ft string, elem []string) []int64 {
if strings.HasPrefix(ft, "set") {
return []int64{int64(len(elem))}
}
return nil
}
func processInsertStmt(conv *internal.Conv, stmt *ast.InsertStmt) {
if stmt.Table == nil {
logStmtError(conv, stmt, fmt.Errorf("source table is nil"))
return
}
srcTable, err := getTableNameInsert(stmt.Table)
if err != nil {
logStmtError(conv, stmt, fmt.Errorf("can't get source table name: %w", err))
return
}
tableId, _ := internal.GetTableIdFromSrcName(conv.SrcSchema, srcTable)
if conv.SchemaMode() {
conv.Stats.Rows[srcTable] += int64(len(stmt.Lists))
conv.DataStatement(NodeType(stmt))
return
}
srcSchema, ok2 := conv.SrcSchema[tableId]
if !ok2 {
conv.Unexpected(fmt.Sprintf("Can't get schemas for table %s", conv.SrcSchema[tableId].Name))
conv.Stats.BadRows[srcTable] += conv.Stats.Rows[srcTable]
return
}
srcColIds := []string{}
srcCols, err2 := getCols(stmt)
if err2 != nil {
// In MySQL, column names might not be specified in insert statement so instead of
// throwing error we will try to retrieve columns from source schema.
for _, srcColId := range conv.SrcSchema[tableId].ColIds {
srcCols = append(srcCols, conv.SrcSchema[tableId].ColDefs[srcColId].Name)
srcColIds = append(srcColIds, srcColId)
}
if len(srcColIds) == 0 {
conv.Unexpected(fmt.Sprintf("Can't get columns for table %s", srcTable))
conv.Stats.BadRows[srcTable] += conv.Stats.Rows[srcTable]
return
}
} else {
for _, srcColName := range srcCols {
colId, _ := internal.GetColIdFromSrcName(conv.SrcSchema[tableId].ColDefs, srcColName)
srcColIds = append(srcColIds, colId)
}
}
var values []string
if stmt.Lists == nil {
logStmtError(conv, stmt, fmt.Errorf("can't get column values"))
return
}
commonColIds := common.IntersectionOfTwoStringSlices(conv.SpSchema[tableId].ColIds, srcColIds)
spSchema := conv.SpSchema[tableId]
colNameIdMap := internal.GetSrcColNameIdMap(conv.SrcSchema[tableId])
for _, row := range stmt.Lists {
values, err = getVals(row)
//prepare values
newValues, err2 := common.PrepareValues(conv, tableId, colNameIdMap, commonColIds, srcCols, values)
if err2 != nil {
conv.Unexpected(fmt.Sprintf("Error while converting data: %s\n", err))
conv.StatsAddBadRow(srcSchema.Name, conv.DataMode())
conv.CollectBadRow(srcSchema.Name, srcCols, values)
continue
}
ProcessDataRow(conv, tableId, commonColIds, srcSchema, spSchema, newValues, internal.AdditionalDataAttributes{ShardId: ""})
}
}
func getCols(stmt *ast.InsertStmt) ([]string, error) {
if stmt.Columns == nil {
return nil, fmt.Errorf("No columns found in insert statement ")
}
var colnames []string
for _, column := range stmt.Columns {
colnames = append(colnames, column.OrigColName())
}
return colnames, nil
}
func getVals(row []ast.ExprNode) ([]string, error) {
if len(row) == 0 {
return nil, fmt.Errorf("Found row with zero length")
}
var values []string
for _, item := range row {
switch valueNode := item.(type) {
case *driver.ValueExpr:
values = append(values, fmt.Sprintf("%v", valueNode.GetValue()))
case *ast.UnaryOperationExpr:
if valueNode.Op != opcode.Minus {
return nil, fmt.Errorf("unexpected UnaryOperationExpr node with opcode %v", valueNode.Op)
}
valExpr, ok := valueNode.V.(*driver.ValueExpr)
if !ok {
return nil, fmt.Errorf("unexpected UnaryOperationExpr node with value type %T", valueNode.V)
}
value, err := getNegativeUnaryVals(valExpr)
if err != nil {
return nil, fmt.Errorf("unexpected UnaryOperationExpr node with value %v", valExpr.GetValue())
}
values = append(values, value)
default:
return nil, fmt.Errorf("unexpected value node %T", valueNode)
}
}
return values, nil
}
func getNegativeUnaryVals(valExpr *driver.ValueExpr) (string, error) {
switch val := valExpr.GetValue().(type) {
case int64:
return fmt.Sprintf("%v", -1*val), nil
case *types.MyDecimal:
floatVal, err := val.ToFloat64()
if err != nil {
return "", fmt.Errorf("unexpected UnaryOperationExpr with value %v", val)
}
return fmt.Sprintf("%v", -1*floatVal), nil
default:
return "", fmt.Errorf("unexpected UnaryOperationExpr value with type %T", val)
}
}
func getTableNameInsert(stmt *ast.TableRefsClause) (string, error) {
if stmt.TableRefs == nil {
return "", fmt.Errorf("can't build table name as tablerefs is empty")
}
if stmt.TableRefs.Left == nil {
return "", fmt.Errorf("can't build table name as Tablerefs.Left is empty")
}
if table, ok := stmt.TableRefs.Left.(*ast.TableSource); ok {
if tablenode, ok := table.Source.(*ast.TableName); ok {
return getTableName(tablenode)
}
return "", fmt.Errorf("Can't build table name as table source is of different type")
}
return "", fmt.Errorf("stmt.TableRefs.Left is different type, can't build table name")
}
func logStmtError(conv *internal.Conv, stmt ast.StmtNode, err error) {
conv.Unexpected(fmt.Sprintf("Processing %v statement: %s", reflect.TypeOf(stmt), err))
conv.ErrorInStatement(NodeType(stmt))
}
// checkEmpty verifies that pkeys is empty and generates a warning if it isn't.
// MySQL explicitly forbids multiple primary keys.
func checkEmpty(conv *internal.Conv, pkeys []schema.Key, stmtType string) {
if len(pkeys) != 0 {
conv.Unexpected(fmt.Sprintf("Multiple primary keys found. `%s` statement is overwriting primary key", stmtType))
}
}
// NodeType strips off "ast." prefix from ast.StmtNode type.
func NodeType(n ast.StmtNode) string {
return strings.TrimPrefix(reflect.TypeOf(n).String(), "*ast.")
}