in pkg/datasource/sql/undo/builder/mysql_insert_undo_log_builder.go [88:139]
func (u *MySQLInsertUndoLogBuilder) buildAfterImageSQL(ctx context.Context, execCtx *types.ExecContext) (string, []driver.Value, error) {
// get all pk value
if execCtx == nil || execCtx.ParseContext == nil || execCtx.ParseContext.InsertStmt == nil {
return "", nil, fmt.Errorf("can't found execCtx or ParseContext or InsertStmt")
}
parseCtx := execCtx.ParseContext
tableName := execCtx.ParseContext.InsertStmt.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O
if execCtx.MetaDataMap == nil {
return "", nil, fmt.Errorf("can't found MetaDataMap")
}
meta := execCtx.MetaDataMap[tableName]
pkValuesMap, err := u.getPkValues(execCtx, parseCtx, meta)
if err != nil {
return "", nil, err
}
pkColumnNameList := meta.GetPrimaryKeyOnlyName()
if len(pkColumnNameList) == 0 {
return "", nil, fmt.Errorf("Pk columnName size is zero")
}
dataTypeMap, err := meta.GetPrimaryKeyTypeStrMap()
if err != nil {
return "", nil, err
}
if len(dataTypeMap) != len(pkColumnNameList) {
return "", nil, fmt.Errorf("PK columnName size don't equal PK DataType size")
}
var pkRowImages []types.RowImage
rowSize := len(pkValuesMap[pkColumnNameList[0]])
for i := 0; i < rowSize; i++ {
for _, name := range pkColumnNameList {
tmpKey := name
tmpArray := pkValuesMap[tmpKey]
pkRowImages = append(pkRowImages, types.RowImage{
Columns: []types.ColumnImage{{
KeyType: types.IndexTypePrimaryKey,
ColumnName: tmpKey,
ColumnType: types.MySQLStrToJavaType(dataTypeMap[tmpKey]),
Value: tmpArray[i],
}},
})
}
}
// build check sql
sb := strings.Builder{}
sb.WriteString("SELECT * FROM " + tableName)
whereSQL := u.buildWhereConditionByPKs(pkColumnNameList, len(pkValuesMap[pkColumnNameList[0]]), "mysql", maxInSize)
sb.WriteString(" WHERE " + whereSQL + " ")
return sb.String(), u.buildPKParams(pkRowImages, pkColumnNameList), nil
}