in backend/plugins/starrocks/tasks/tasks.go [263:378]
func copyDataToDst(dc *DataConfigParams, columnMap map[string]string, orderBy string) error {
c := dc.Ctx
logger := dc.Ctx.GetLogger()
config := dc.Config
db := dc.SrcDb
starrocksDb := dc.DestDb
table := dc.SrcTableName
starrocksTable := dc.DestTableName
starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
tableConfig, ok := config.TableConfigs[table]
where := ""
if ok {
where = tableConfig.Where
}
var offset int
var err error
var rows dal.Rows
rows, err = db.Cursor(
dal.From(table),
dal.Orderby(orderBy),
dal.Where(where),
)
if err != nil {
if strings.Contains(err.Error(), "cached plan must not change result type") {
logger.Warn(err, "skip err: cached plan must not change result type")
rows, err = db.Cursor(
dal.From(table),
dal.Orderby(orderBy),
dal.Where(where),
)
if err != nil {
return err
}
} else {
return err
}
}
defer rows.Close()
var data []map[string]interface{}
cols, err := (rows).Columns()
if err != nil {
return err
}
var batchCount int
for rows.Next() {
select {
case <-c.GetContext().Done():
return c.GetContext().Err()
default:
}
row := make(map[string]interface{})
columns := make([]interface{}, len(cols))
columnPointers := make([]interface{}, len(cols))
for i := range columns {
dataType := columnMap[cols[i]]
if strings.HasPrefix(dataType, "array") {
var arr []string
columns[i] = &arr
columnPointers[i] = pq.Array(&arr)
} else {
columnPointers[i] = &columns[i]
}
}
err = rows.Scan(columnPointers...)
if err != nil {
return err
}
for i, colName := range cols {
row[colName] = columns[i]
}
data = append(data, row)
batchCount += 1
if batchCount == config.BatchSize {
err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
if err != nil {
return err
}
batchCount = 0
data = nil
}
}
if batchCount != 0 {
err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
if err != nil {
return err
}
}
// drop old table
err = starrocksDb.Exec("DROP TABLE IF EXISTS ?", clause.Table{Name: starrocksTable})
if err != nil {
return err
}
// rename tmp table to old table
err = starrocksDb.Exec("ALTER TABLE ? RENAME ?", clause.Table{Name: starrocksTmpTable}, clause.Table{Name: starrocksTable})
if err != nil {
return err
}
// check data count
sourceCount, err := db.Count(dal.From(table))
if err != nil {
return err
}
starrocksCount, err := starrocksDb.Count(dal.From(starrocksTable))
if err != nil {
return err
}
if sourceCount != starrocksCount {
logger.Warn(nil, "source count %d not equal to starrocks count %d", sourceCount, starrocksCount)
}
logger.Info("load %s to starrocks success", table)
return nil
}