func()

in pkg/datasource/sql/undo/builder/mysql_insert_undo_log_builder.go [88:139]


func (u *MySQLInsertUndoLogBuilder) buildAfterImageSQL(ctx context.Context, execCtx *types.ExecContext) (string, []driver.Value, error) {
	// get all pk value
	if execCtx == nil || execCtx.ParseContext == nil || execCtx.ParseContext.InsertStmt == nil {
		return "", nil, fmt.Errorf("can't found execCtx or ParseContext or InsertStmt")
	}
	parseCtx := execCtx.ParseContext
	tableName := execCtx.ParseContext.InsertStmt.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O
	if execCtx.MetaDataMap == nil {
		return "", nil, fmt.Errorf("can't found  MetaDataMap")
	}
	meta := execCtx.MetaDataMap[tableName]
	pkValuesMap, err := u.getPkValues(execCtx, parseCtx, meta)
	if err != nil {
		return "", nil, err
	}

	pkColumnNameList := meta.GetPrimaryKeyOnlyName()
	if len(pkColumnNameList) == 0 {
		return "", nil, fmt.Errorf("Pk columnName size is zero")
	}

	dataTypeMap, err := meta.GetPrimaryKeyTypeStrMap()
	if err != nil {
		return "", nil, err
	}
	if len(dataTypeMap) != len(pkColumnNameList) {
		return "", nil, fmt.Errorf("PK columnName size don't equal PK DataType size")
	}
	var pkRowImages []types.RowImage

	rowSize := len(pkValuesMap[pkColumnNameList[0]])
	for i := 0; i < rowSize; i++ {
		for _, name := range pkColumnNameList {
			tmpKey := name
			tmpArray := pkValuesMap[tmpKey]
			pkRowImages = append(pkRowImages, types.RowImage{
				Columns: []types.ColumnImage{{
					KeyType:    types.IndexTypePrimaryKey,
					ColumnName: tmpKey,
					ColumnType: types.MySQLStrToJavaType(dataTypeMap[tmpKey]),
					Value:      tmpArray[i],
				}},
			})
		}
	}
	// build check sql
	sb := strings.Builder{}
	sb.WriteString("SELECT * FROM " + tableName)
	whereSQL := u.buildWhereConditionByPKs(pkColumnNameList, len(pkValuesMap[pkColumnNameList[0]]), "mysql", maxInSize)
	sb.WriteString(" WHERE " + whereSQL + " ")
	return sb.String(), u.buildPKParams(pkRowImages, pkColumnNameList), nil
}