in pkg/datasource/sql/exec/at/update_executor.go [215:283]
func (u *updateExecutor) buildBeforeImageSQL(ctx context.Context, args []driver.NamedValue) (string, []driver.NamedValue, error) {
if !u.isAstStmtValid() {
log.Errorf("invalid update stmt")
return "", nil, fmt.Errorf("invalid update stmt")
}
updateStmt := u.parserCtx.UpdateStmt
fields := make([]*ast.SelectField, 0, len(updateStmt.List))
if undo.UndoConfig.OnlyCareUpdateColumns {
for _, column := range updateStmt.List {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Name: column.Column,
},
})
}
// select indexes columns
tableName, _ := u.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, u.execContext.DBName, tableName)
if err != nil {
return "", nil, err
}
for _, columnName := range metaData.GetPrimaryKeyOnlyName() {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Name: &ast.ColumnName{
Name: model.CIStr{
O: columnName,
L: columnName,
},
},
},
})
}
} else {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Name: &ast.ColumnName{
Name: model.CIStr{
O: "*",
L: "*",
},
},
},
})
}
selStmt := ast.SelectStmt{
SelectStmtOpts: &ast.SelectStmtOpts{},
From: updateStmt.TableRefs,
Where: updateStmt.Where,
Fields: &ast.FieldList{Fields: fields},
OrderBy: updateStmt.Order,
Limit: updateStmt.Limit,
TableHints: updateStmt.TableHints,
LockInfo: &ast.SelectLockInfo{
LockType: ast.SelectLockForUpdate,
},
}
b := bytes.NewByteBuffer([]byte{})
_ = selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b))
sql := string(b.Bytes())
log.Infof("build select sql by update sourceQuery, sql {%s}", sql)
return sql, u.buildSelectArgs(&selStmt, args), nil
}