func()

in pkg/datasource/sql/exec/at/insert_on_update_executor.go [91:143]


func (i *insertOnUpdateExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
	if !i.isAstStmtValid() {
		log.Errorf("invalid insert statement! parser ctx:%+v", i.parserCtx)
		return nil, fmt.Errorf("invalid insert statement! parser ctx:%+v", i.parserCtx)
	}
	tableName, err := i.parserCtx.GetTableName()
	if err != nil {
		return nil, err
	}
	metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
	if err != nil {
		return nil, err
	}
	selectSQL, selectArgs, err := i.buildBeforeImageSQL(i.parserCtx.InsertStmt, *metaData, i.execContext.NamedValues)
	if err != nil {
		return nil, err
	}
	if len(selectArgs) == 0 {
		log.Errorf("the SQL statement has no primary key or unique index value, it will not hit any row data."+
			"recommend to convert to a normal insert statement. db name:%s table name:%s sql:%s", i.execContext.DBName, tableName, i.execContext.Query)
		return nil, fmt.Errorf("invalid insert or update sql")
	}
	i.beforeSelectSql = selectSQL
	i.beforeSelectArgs = selectArgs

	var rowsi driver.Rows
	queryerCtx, queryerCtxExists := i.execContext.Conn.(driver.QueryerContext)
	var queryer driver.Queryer
	var queryerExists bool

	if !queryerCtxExists {
		queryer, queryerExists = i.execContext.Conn.(driver.Queryer)
	}
	if !queryerExists && !queryerCtxExists {
		log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
		return nil, fmt.Errorf("invalid conn")
	}
	rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, selectArgs)
	defer func() {
		if rowsi != nil {
			rowsi.Close()
		}
	}()
	if err != nil {
		log.Errorf("ctx driver query: %+v", err)
		return nil, err
	}
	image, err := i.buildRecordImages(rowsi, metaData, types.SQLTypeInsertOnDuplicateUpdate)
	if err != nil {
		return nil, err
	}
	return image, nil
}