func copyDataToDst()

in backend/plugins/starrocks/tasks/tasks.go [263:378]


func copyDataToDst(dc *DataConfigParams, columnMap map[string]string, orderBy string) error {
	c := dc.Ctx
	logger := dc.Ctx.GetLogger()
	config := dc.Config
	db := dc.SrcDb
	starrocksDb := dc.DestDb
	table := dc.SrcTableName
	starrocksTable := dc.DestTableName
	starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
	tableConfig, ok := config.TableConfigs[table]
	where := ""
	if ok {
		where = tableConfig.Where
	}
	var offset int
	var err error
	var rows dal.Rows
	rows, err = db.Cursor(
		dal.From(table),
		dal.Orderby(orderBy),
		dal.Where(where),
	)
	if err != nil {
		if strings.Contains(err.Error(), "cached plan must not change result type") {
			logger.Warn(err, "skip err: cached plan must not change result type")
			rows, err = db.Cursor(
				dal.From(table),
				dal.Orderby(orderBy),
				dal.Where(where),
			)
			if err != nil {
				return err
			}
		} else {
			return err
		}
	}
	defer rows.Close()

	var data []map[string]interface{}
	cols, err := (rows).Columns()
	if err != nil {
		return err
	}

	var batchCount int
	for rows.Next() {
		select {
		case <-c.GetContext().Done():
			return c.GetContext().Err()
		default:
		}
		row := make(map[string]interface{})
		columns := make([]interface{}, len(cols))
		columnPointers := make([]interface{}, len(cols))
		for i := range columns {
			dataType := columnMap[cols[i]]
			if strings.HasPrefix(dataType, "array") {
				var arr []string
				columns[i] = &arr
				columnPointers[i] = pq.Array(&arr)
			} else {
				columnPointers[i] = &columns[i]
			}
		}
		err = rows.Scan(columnPointers...)
		if err != nil {
			return err
		}
		for i, colName := range cols {
			row[colName] = columns[i]
		}
		data = append(data, row)
		batchCount += 1
		if batchCount == config.BatchSize {
			err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
			if err != nil {
				return err
			}
			batchCount = 0
			data = nil
		}
	}
	if batchCount != 0 {
		err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
		if err != nil {
			return err
		}
	}

	// drop old table
	err = starrocksDb.Exec("DROP TABLE IF EXISTS ?", clause.Table{Name: starrocksTable})
	if err != nil {
		return err
	}
	// rename tmp table to old table
	err = starrocksDb.Exec("ALTER TABLE ? RENAME ?", clause.Table{Name: starrocksTmpTable}, clause.Table{Name: starrocksTable})
	if err != nil {
		return err
	}

	// check data count
	sourceCount, err := db.Count(dal.From(table))
	if err != nil {
		return err
	}
	starrocksCount, err := starrocksDb.Count(dal.From(starrocksTable))
	if err != nil {
		return err
	}
	if sourceCount != starrocksCount {
		logger.Warn(nil, "source count %d not equal to starrocks count %d", sourceCount, starrocksCount)
	}
	logger.Info("load %s to starrocks success", table)
	return nil
}