func createTmpTableInStarrocks()

in backend/plugins/starrocks/tasks/tasks.go [132:229]


func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string, bool, error) {
	logger := dc.Ctx.GetLogger()
	config := dc.Config
	db := dc.SrcDb
	starrocksDb := dc.DestDb
	table := dc.SrcTableName
	starrocksTable := dc.DestTableName
	starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)

	columnMetas, err := db.GetColumns(&Table{name: table}, nil)
	updateColumn := config.UpdateColumn
	columnMap := make(map[string]string)
	if err != nil {
		if strings.Contains(err.Error(), "cached plan must not change result type") {
			logger.Warn(err, "skip err: cached plan must not change result type")
			columnMetas, err = db.GetColumns(&Table{name: table}, nil)
			if err != nil {
				return nil, "", false, err
			}
		} else {
			return nil, "", false, err
		}
	}

	var pks, orders, columns []string
	var separator, firstcm, firstcmName string
	if db.Dialect() == "postgres" {
		separator = "\""
	} else if db.Dialect() == "mysql" {
		separator = "`"
	} else {
		return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
	}
	for _, cm := range columnMetas {
		name := cm.Name()
		if name == updateColumn {
			// check update column to detect skip or not
			var updatedFrom time.Time
			err = db.All(&updatedFrom, dal.Select(updateColumn), dal.From(table), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
			if err != nil {
				return nil, "", false, err
			}

			var updatedTo time.Time
			err = starrocksDb.All(&updatedTo, dal.Select(updateColumn), dal.From(starrocksTable), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
			if err != nil {
				if !strings.Contains(err.Error(), "Unknown table") {
					return nil, "", false, err
				}
			} else {
				if updatedFrom.Equal(updatedTo) {
					return nil, "", true, nil
				}
			}
		}

		columnDatatype, ok := cm.ColumnType()
		if !ok {
			return columnMap, "", false, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
		}
		dataType := utils.GetStarRocksDataType(columnDatatype)
		columnMap[name] = dataType
		column := fmt.Sprintf("`%s` %s", name, dataType)
		columns = append(columns, column)
		isPrimaryKey, ok := cm.PrimaryKey()
		if isPrimaryKey && ok {
			pks = append(pks, fmt.Sprintf("`%s`", name))
			orders = append(orders, fmt.Sprintf("%s%s%s", separator, name, separator))
		}
		if firstcm == "" {
			firstcm = fmt.Sprintf("`%s`", name)
			firstcmName = fmt.Sprintf("%s%s%s", separator, name, separator)
		}
	}

	if len(pks) == 0 {
		pks = append(pks, firstcm)
	}
	orderBy := strings.Join(orders, ", ")
	if config.OrderBy != nil {
		if v, ok := config.OrderBy[table]; ok {
			orderBy = v
		}
	}
	if orderBy == "" {
		orderBy = firstcmName
	}
	extra := fmt.Sprintf(`engine=olap distributed by hash(%s) properties("replication_num" = "1")`, strings.Join(pks, ", "))
	if config.Extra != nil {
		if v, ok := config.Extra[table]; ok {
			extra = v
		}
	}
	tableSql := fmt.Sprintf("DROP TABLE IF EXISTS %s; CREATE TABLE IF NOT EXISTS `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
	logger.Debug(tableSql)
	err = starrocksDb.Exec(tableSql)
	return columnMap, orderBy, false, err
}