pkg/datasource/sql/exec/at/multi_delete_executor.go (156 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package at import ( "bytes" "context" "database/sql/driver" "fmt" "github.com/arana-db/parser/ast" "github.com/arana-db/parser/format" "seata.apache.org/seata-go/pkg/datasource/sql/datasource" "seata.apache.org/seata-go/pkg/datasource/sql/exec" "seata.apache.org/seata-go/pkg/datasource/sql/types" "seata.apache.org/seata-go/pkg/datasource/sql/util" "seata.apache.org/seata-go/pkg/util/log" ) type multiDeleteExecutor struct { baseExecutor parserCtx *types.ParseContext execContext *types.ExecContext } func (m *multiDeleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) { m.beforeHooks(ctx, m.execContext) defer func() { m.afterHooks(ctx, m.execContext) }() beforeImage, err := m.beforeImage(ctx) if err != nil { return nil, err } res, err := f(ctx, m.execContext.Query, m.execContext.NamedValues) if err != nil { return nil, err } afterImage, err := m.afterImage(ctx) if err != nil { return nil, err } m.execContext.TxCtx.RoundImages.AppendBeofreImages(beforeImage) m.execContext.TxCtx.RoundImages.AppendAfterImages(afterImage) return res, nil } type multiDelete struct { sql string clear bool } // NewMultiDeleteExecutor get multiDelete executor func NewMultiDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) *multiDeleteExecutor { return &multiDeleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}} } func (m *multiDeleteExecutor) beforeImage(ctx context.Context) ([]*types.RecordImage, error) { selectSQL, args, err := m.buildBeforeImageSQL() if err != nil { return nil, err } var ( rowsi driver.Rows image *types.RecordImage records []*types.RecordImage ) queryerCtx, ok := m.execContext.Conn.(driver.QueryerContext) var queryer driver.Queryer if !ok { queryer, ok = m.execContext.Conn.(driver.Queryer) } if !ok { log.Errorf("target conn should been driver.QueryerContext or driver.Queryer") return nil, fmt.Errorf("invalid conn") } rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, args) defer func() { if rowsi != nil { rowsi.Close() } }() if err != nil { log.Errorf("ctx driver query: %+v", err) return nil, err } tableName, err := m.getFromTableInSQL() if err != nil { return nil, err } metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, m.execContext.DBName, tableName) if err != nil { return nil, err } image, err = m.buildRecordImages(rowsi, metaData, types.SQLTypeDelete) if err != nil { log.Errorf("record images : %+v", err) return nil, err } records = append(records, image) lockKey := m.buildLockKey(image, *metaData) m.execContext.TxCtx.LockKeys[lockKey] = struct{}{} return records, err } func (m *multiDeleteExecutor) afterImage(ctx context.Context) ([]*types.RecordImage, error) { tableName, _ := m.parserCtx.GetTableName() metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, m.execContext.DBName, tableName) if err != nil { return nil, err } image := types.NewEmptyRecordImage(metaData, types.SQLTypeDelete) return []*types.RecordImage{image}, nil } func (m *multiDeleteExecutor) buildBeforeImageSQL() (string, []driver.NamedValue, error) { tableName, err := m.getFromTableInSQL() if err != nil { return "", nil, err } var ( // todo optimize replace * by use columns selectSQL = "SELECT SQL_NO_CACHE * FROM " + tableName params []driver.NamedValue whereCondition string hasWhereCondition = true ) for _, parser := range m.parserCtx.MultiStmt { deleteParser := parser.DeleteStmt if deleteParser == nil { continue } if deleteParser.Limit != nil { return "", nil, fmt.Errorf("Multi delete SQL with limit condition is not support yet!") } if deleteParser.Order != nil { return "", nil, fmt.Errorf("Multi delete SQL with orderBy condition is not support yet!") } if deleteParser.Where == nil || !hasWhereCondition { hasWhereCondition = false continue } var whereBuffer bytes.Buffer if err = deleteParser.Where.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, &whereBuffer)); err != nil { return "", nil, err } if whereCondition != "" { whereCondition += " OR " } whereCondition += fmt.Sprintf("(%s)", string(whereBuffer.Bytes())) newParams := m.buildSelectArgs(&ast.SelectStmt{Where: parser.DeleteStmt.Where}, m.execContext.NamedValues) params = append(params, newParams...) } if hasWhereCondition { selectSQL += " WHERE " + whereCondition } else { params = []driver.NamedValue{} } selectSQL += " FOR UPDATE" return selectSQL, params, nil } func (m *multiDeleteExecutor) getFromTableInSQL() (string, error) { for _, parser := range m.parserCtx.MultiStmt { if parser != nil { return parser.GetTableName() } } return "", fmt.Errorf("multi delete sql has no table name") }