pkg/datasource/sql/exec/at/delete_executor.go (120 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package at import ( "context" "database/sql/driver" "fmt" "github.com/arana-db/parser/ast" "github.com/arana-db/parser/format" "seata.apache.org/seata-go/pkg/datasource/sql/datasource" "seata.apache.org/seata-go/pkg/datasource/sql/exec" "seata.apache.org/seata-go/pkg/datasource/sql/parser" "seata.apache.org/seata-go/pkg/datasource/sql/types" "seata.apache.org/seata-go/pkg/datasource/sql/util" "seata.apache.org/seata-go/pkg/util/bytes" "seata.apache.org/seata-go/pkg/util/log" ) // deleteExecutor execute delete SQL type deleteExecutor struct { baseExecutor parserCtx *types.ParseContext execContext *types.ExecContext } // NewDeleteExecutor get delete executor func NewDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor { return &deleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}} } // ExecContext exec SQL, and generate before image and after image func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) { d.beforeHooks(ctx, d.execContext) defer func() { d.afterHooks(ctx, d.execContext) }() beforeImage, err := d.beforeImage(ctx) if err != nil { return nil, err } res, err := f(ctx, d.execContext.Query, d.execContext.NamedValues) if err != nil { return nil, err } afterImage, err := d.afterImage(ctx) if err != nil { return nil, err } d.execContext.TxCtx.RoundImages.AppendBeofreImage(beforeImage) d.execContext.TxCtx.RoundImages.AppendAfterImage(afterImage) return res, nil } // beforeImage build before image func (d *deleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) { selectSQL, selectArgs, err := d.buildBeforeImageSQL(d.execContext.Query, d.execContext.NamedValues) if err != nil { return nil, err } var rowsi driver.Rows queryerCtx, ok := d.execContext.Conn.(driver.QueryerContext) var queryer driver.Queryer if !ok { queryer, ok = d.execContext.Conn.(driver.Queryer) } if ok { rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, selectArgs) defer func() { if rowsi != nil { rowsi.Close() } }() if err != nil { log.Errorf("ctx driver query: %+v", err) return nil, err } } else { log.Errorf("target conn should been driver.QueryerContext or driver.Queryer") return nil, fmt.Errorf("invalid conn") } tableName, _ := d.parserCtx.GetTableName() metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContext.DBName, tableName) if err != nil { return nil, err } image, err := d.buildRecordImages(rowsi, metaData, types.SQLTypeDelete) if err != nil { return nil, err } image.SQLType = types.SQLTypeDelete image.TableMeta = metaData lockKey := d.buildLockKey(image, *metaData) d.execContext.TxCtx.LockKeys[lockKey] = struct{}{} return image, nil } // buildBeforeImageSQL build delete sql from delete sql func (d *deleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedValue) (string, []driver.NamedValue, error) { p, err := parser.DoParser(query) if err != nil { return "", nil, err } if p.DeleteStmt == nil { log.Errorf("invalid delete stmt") return "", nil, fmt.Errorf("invalid delete stmt") } selStmt := ast.SelectStmt{ SelectStmtOpts: &ast.SelectStmtOpts{}, From: p.DeleteStmt.TableRefs, Where: p.DeleteStmt.Where, Fields: &ast.FieldList{Fields: []*ast.SelectField{{WildCard: &ast.WildCardField{}}}}, OrderBy: p.DeleteStmt.Order, Limit: p.DeleteStmt.Limit, TableHints: p.DeleteStmt.TableHints, LockInfo: &ast.SelectLockInfo{ LockType: ast.SelectLockForUpdate, }, } b := bytes.NewByteBuffer([]byte{}) _ = selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b)) sql := string(b.Bytes()) log.Infof("build select sql by delete sourceQuery, sql {%s}", sql) return sql, d.buildSelectArgs(&selStmt, args), nil } // afterImage build after image func (d *deleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) { tableName, _ := d.parserCtx.GetTableName() metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContext.DBName, tableName) if err != nil { return nil, err } return types.NewEmptyRecordImage(metaData, types.SQLTypeDelete), nil }