in pkg/datasource/sql/exec/at/insert_on_update_executor.go [91:143]
func (i *insertOnUpdateExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
if !i.isAstStmtValid() {
log.Errorf("invalid insert statement! parser ctx:%+v", i.parserCtx)
return nil, fmt.Errorf("invalid insert statement! parser ctx:%+v", i.parserCtx)
}
tableName, err := i.parserCtx.GetTableName()
if err != nil {
return nil, err
}
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
if err != nil {
return nil, err
}
selectSQL, selectArgs, err := i.buildBeforeImageSQL(i.parserCtx.InsertStmt, *metaData, i.execContext.NamedValues)
if err != nil {
return nil, err
}
if len(selectArgs) == 0 {
log.Errorf("the SQL statement has no primary key or unique index value, it will not hit any row data."+
"recommend to convert to a normal insert statement. db name:%s table name:%s sql:%s", i.execContext.DBName, tableName, i.execContext.Query)
return nil, fmt.Errorf("invalid insert or update sql")
}
i.beforeSelectSql = selectSQL
i.beforeSelectArgs = selectArgs
var rowsi driver.Rows
queryerCtx, queryerCtxExists := i.execContext.Conn.(driver.QueryerContext)
var queryer driver.Queryer
var queryerExists bool
if !queryerCtxExists {
queryer, queryerExists = i.execContext.Conn.(driver.Queryer)
}
if !queryerExists && !queryerCtxExists {
log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
return nil, fmt.Errorf("invalid conn")
}
rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, selectArgs)
defer func() {
if rowsi != nil {
rowsi.Close()
}
}()
if err != nil {
log.Errorf("ctx driver query: %+v", err)
return nil, err
}
image, err := i.buildRecordImages(rowsi, metaData, types.SQLTypeInsertOnDuplicateUpdate)
if err != nil {
return nil, err
}
return image, nil
}