pkg/datasource/sql/exec/at/delete_executor.go (120 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package at
import (
"context"
"database/sql/driver"
"fmt"
"github.com/arana-db/parser/ast"
"github.com/arana-db/parser/format"
"seata.apache.org/seata-go/pkg/datasource/sql/datasource"
"seata.apache.org/seata-go/pkg/datasource/sql/exec"
"seata.apache.org/seata-go/pkg/datasource/sql/parser"
"seata.apache.org/seata-go/pkg/datasource/sql/types"
"seata.apache.org/seata-go/pkg/datasource/sql/util"
"seata.apache.org/seata-go/pkg/util/bytes"
"seata.apache.org/seata-go/pkg/util/log"
)
// deleteExecutor execute delete SQL
type deleteExecutor struct {
baseExecutor
parserCtx *types.ParseContext
execContext *types.ExecContext
}
// NewDeleteExecutor get delete executor
func NewDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor {
return &deleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}}
}
// ExecContext exec SQL, and generate before image and after image
func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
d.beforeHooks(ctx, d.execContext)
defer func() {
d.afterHooks(ctx, d.execContext)
}()
beforeImage, err := d.beforeImage(ctx)
if err != nil {
return nil, err
}
res, err := f(ctx, d.execContext.Query, d.execContext.NamedValues)
if err != nil {
return nil, err
}
afterImage, err := d.afterImage(ctx)
if err != nil {
return nil, err
}
d.execContext.TxCtx.RoundImages.AppendBeofreImage(beforeImage)
d.execContext.TxCtx.RoundImages.AppendAfterImage(afterImage)
return res, nil
}
// beforeImage build before image
func (d *deleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
selectSQL, selectArgs, err := d.buildBeforeImageSQL(d.execContext.Query, d.execContext.NamedValues)
if err != nil {
return nil, err
}
var rowsi driver.Rows
queryerCtx, ok := d.execContext.Conn.(driver.QueryerContext)
var queryer driver.Queryer
if !ok {
queryer, ok = d.execContext.Conn.(driver.Queryer)
}
if ok {
rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, selectArgs)
defer func() {
if rowsi != nil {
rowsi.Close()
}
}()
if err != nil {
log.Errorf("ctx driver query: %+v", err)
return nil, err
}
} else {
log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
return nil, fmt.Errorf("invalid conn")
}
tableName, _ := d.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContext.DBName, tableName)
if err != nil {
return nil, err
}
image, err := d.buildRecordImages(rowsi, metaData, types.SQLTypeDelete)
if err != nil {
return nil, err
}
image.SQLType = types.SQLTypeDelete
image.TableMeta = metaData
lockKey := d.buildLockKey(image, *metaData)
d.execContext.TxCtx.LockKeys[lockKey] = struct{}{}
return image, nil
}
// buildBeforeImageSQL build delete sql from delete sql
func (d *deleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedValue) (string, []driver.NamedValue, error) {
p, err := parser.DoParser(query)
if err != nil {
return "", nil, err
}
if p.DeleteStmt == nil {
log.Errorf("invalid delete stmt")
return "", nil, fmt.Errorf("invalid delete stmt")
}
selStmt := ast.SelectStmt{
SelectStmtOpts: &ast.SelectStmtOpts{},
From: p.DeleteStmt.TableRefs,
Where: p.DeleteStmt.Where,
Fields: &ast.FieldList{Fields: []*ast.SelectField{{WildCard: &ast.WildCardField{}}}},
OrderBy: p.DeleteStmt.Order,
Limit: p.DeleteStmt.Limit,
TableHints: p.DeleteStmt.TableHints,
LockInfo: &ast.SelectLockInfo{
LockType: ast.SelectLockForUpdate,
},
}
b := bytes.NewByteBuffer([]byte{})
_ = selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b))
sql := string(b.Bytes())
log.Infof("build select sql by delete sourceQuery, sql {%s}", sql)
return sql, d.buildSelectArgs(&selStmt, args), nil
}
// afterImage build after image
func (d *deleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) {
tableName, _ := d.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContext.DBName, tableName)
if err != nil {
return nil, err
}
return types.NewEmptyRecordImage(metaData, types.SQLTypeDelete), nil
}