in pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder.go [172:241]
func (u *MySQLUpdateUndoLogBuilder) buildBeforeImageSQL(ctx context.Context, execCtx *types.ExecContext, args []driver.Value) (string, []driver.Value, error) {
updateStmt := execCtx.ParseContext.UpdateStmt
if updateStmt == nil {
log.Errorf("invalid update stmt")
return "", nil, fmt.Errorf("invalid update stmt")
}
fields := make([]*ast.SelectField, 0, len(updateStmt.List))
// todo: OnlyCareUpdateColumns should load from config first
if OnlyCareUpdateColumns {
for _, column := range updateStmt.List {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Name: column.Column,
},
})
}
// select indexes columns
tableName, _ := execCtx.ParseContext.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, execCtx.DBName, tableName)
if err != nil {
return "", nil, err
}
for _, columnName := range metaData.GetPrimaryKeyOnlyName() {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Name: &ast.ColumnName{
Name: model.CIStr{
O: columnName,
L: columnName,
},
},
},
})
}
} else {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Name: &ast.ColumnName{
Name: model.CIStr{
O: "*",
L: "*",
},
},
},
})
}
selStmt := ast.SelectStmt{
SelectStmtOpts: &ast.SelectStmtOpts{},
From: updateStmt.TableRefs,
Where: updateStmt.Where,
Fields: &ast.FieldList{Fields: fields},
OrderBy: updateStmt.Order,
Limit: updateStmt.Limit,
TableHints: updateStmt.TableHints,
LockInfo: &ast.SelectLockInfo{
LockType: ast.SelectLockForUpdate,
},
}
b := bytes.NewByteBuffer([]byte{})
_ = selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b))
sql := string(b.Bytes())
log.Infof("build select sql by update sourceQuery, sql {%s}", sql)
return sql, u.buildSelectArgs(&selStmt, args), nil
}