in backend/plugins/starrocks/tasks/tasks.go [132:229]
func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string, bool, error) {
logger := dc.Ctx.GetLogger()
config := dc.Config
db := dc.SrcDb
starrocksDb := dc.DestDb
table := dc.SrcTableName
starrocksTable := dc.DestTableName
starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
columnMetas, err := db.GetColumns(&Table{name: table}, nil)
updateColumn := config.UpdateColumn
columnMap := make(map[string]string)
if err != nil {
if strings.Contains(err.Error(), "cached plan must not change result type") {
logger.Warn(err, "skip err: cached plan must not change result type")
columnMetas, err = db.GetColumns(&Table{name: table}, nil)
if err != nil {
return nil, "", false, err
}
} else {
return nil, "", false, err
}
}
var pks, orders, columns []string
var separator, firstcm, firstcmName string
if db.Dialect() == "postgres" {
separator = "\""
} else if db.Dialect() == "mysql" {
separator = "`"
} else {
return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
}
for _, cm := range columnMetas {
name := cm.Name()
if name == updateColumn {
// check update column to detect skip or not
var updatedFrom time.Time
err = db.All(&updatedFrom, dal.Select(updateColumn), dal.From(table), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
if err != nil {
return nil, "", false, err
}
var updatedTo time.Time
err = starrocksDb.All(&updatedTo, dal.Select(updateColumn), dal.From(starrocksTable), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
if err != nil {
if !strings.Contains(err.Error(), "Unknown table") {
return nil, "", false, err
}
} else {
if updatedFrom.Equal(updatedTo) {
return nil, "", true, nil
}
}
}
columnDatatype, ok := cm.ColumnType()
if !ok {
return columnMap, "", false, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
}
dataType := utils.GetStarRocksDataType(columnDatatype)
columnMap[name] = dataType
column := fmt.Sprintf("`%s` %s", name, dataType)
columns = append(columns, column)
isPrimaryKey, ok := cm.PrimaryKey()
if isPrimaryKey && ok {
pks = append(pks, fmt.Sprintf("`%s`", name))
orders = append(orders, fmt.Sprintf("%s%s%s", separator, name, separator))
}
if firstcm == "" {
firstcm = fmt.Sprintf("`%s`", name)
firstcmName = fmt.Sprintf("%s%s%s", separator, name, separator)
}
}
if len(pks) == 0 {
pks = append(pks, firstcm)
}
orderBy := strings.Join(orders, ", ")
if config.OrderBy != nil {
if v, ok := config.OrderBy[table]; ok {
orderBy = v
}
}
if orderBy == "" {
orderBy = firstcmName
}
extra := fmt.Sprintf(`engine=olap distributed by hash(%s) properties("replication_num" = "1")`, strings.Join(pks, ", "))
if config.Extra != nil {
if v, ok := config.Extra[table]; ok {
extra = v
}
}
tableSql := fmt.Sprintf("DROP TABLE IF EXISTS %s; CREATE TABLE IF NOT EXISTS `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
logger.Debug(tableSql)
err = starrocksDb.Exec(tableSql)
return columnMap, orderBy, false, err
}