func()

in pkg/datasource/sql/exec/at/insert_executor.go [143:199]


func (i *insertExecutor) buildAfterImageSQL(ctx context.Context) (string, []driver.NamedValue, error) {
	// get all pk value
	tableName, _ := i.parserCtx.GetTableName()

	meta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
	if err != nil {
		return "", nil, err
	}
	pkValuesMap, err := i.getPkValues(ctx, i.execContext, i.parserCtx, *meta)
	if err != nil {
		return "", nil, err
	}

	pkColumnNameList := meta.GetPrimaryKeyOnlyName()
	if len(pkColumnNameList) == 0 {
		return "", nil, fmt.Errorf("Pk columnName size is zero")
	}

	dataTypeMap, err := meta.GetPrimaryKeyTypeStrMap()
	if err != nil {
		return "", nil, err
	}
	if len(dataTypeMap) != len(pkColumnNameList) {
		return "", nil, fmt.Errorf("PK columnName size don't equal PK DataType size")
	}
	var pkRowImages []types.RowImage

	rowSize := len(pkValuesMap[pkColumnNameList[0]])
	for i := 0; i < rowSize; i++ {
		for _, name := range pkColumnNameList {
			tmpKey := name
			tmpArray := pkValuesMap[tmpKey]
			pkRowImages = append(pkRowImages, types.RowImage{
				Columns: []types.ColumnImage{{
					KeyType:    types.IndexTypePrimaryKey,
					ColumnName: tmpKey,
					ColumnType: types.MySQLStrToJavaType(dataTypeMap[tmpKey]),
					Value:      tmpArray[i],
				}},
			})
		}
	}
	// build check sql
	sb := strings.Builder{}
	suffix := strings.Builder{}
	var insertColumns []string

	for _, column := range i.parserCtx.InsertStmt.Columns {
		insertColumns = append(insertColumns, column.Name.O)
	}
	sb.WriteString("SELECT " + strings.Join(i.getNeedColumns(meta, insertColumns, types.DBTypeMySQL), ", "))
	suffix.WriteString(" FROM " + tableName)
	whereSQL := i.buildWhereConditionByPKs(pkColumnNameList, rowSize, "mysql", maxInSize)
	suffix.WriteString(" WHERE " + whereSQL + " ")
	sb.WriteString(suffix.String())
	return sb.String(), i.buildPKParams(pkRowImages, pkColumnNameList), nil
}