in cmd/transform/transform.go [71:158]
func transform(clickHouseUrl string, idName string, tableName string) error {
slog.Info("start transforming", "db", idName)
split := strings.Split(idName, "_")
dbName := idName
if len(split) > 1 {
dbName = split[0]
}
db, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{clickHouseUrl},
Auth: clickhouse.Auth{
Database: dbName,
},
DialTimeout: time.Second,
ConnMaxLifetime: time.Hour,
Settings: map[string]any{
// https://github.com/ClickHouse/ClickHouse/issues/2833
// ZSTD 19+ is used, read/write timeout should be quite large (10 minutes)
"send_timeout": 30_000,
"receive_timeout": 3000,
"max_memory_usage": 100000000000,
},
})
if err != nil {
return fmt.Errorf("cannot connect to clickhouse: %w", err)
}
defer util.Close(db)
taskContext, cancel := util.CreateCommandContext()
defer cancel()
config := analyzer.GetAnalyzer(idName)
config.TableName = tableName + "2"
insertReportManager, err := analyzer.NewInsertReportManager(taskContext, db, nil, config, tableName+"2", insertWorkerCount)
if err != nil {
return err
}
insertManager := insertReportManager.InsertManager
// we send batch in the end of each iteration
insertManager.BatchSize = 50_000
// the whole select result in memory - so, limit
var minTime time.Time
var maxTime time.Time
// use something like (now() - toIntervalMonth(1)) to test the transformer on a fresh data
err = db.QueryRow(taskContext, "select min(generated_time) as min, max(generated_time) as max from "+tableName).Scan(&minTime, &maxTime)
if err != nil {
return fmt.Errorf("cannot query min/max: %w", err)
}
slog.Info("time range", "start", minTime, "end", maxTime)
// round to the start of the month
minTime = time.Date(minTime.Year(), minTime.Month(), 1, 0, 0, 0, 0, minTime.Location())
// round to the end of the month
if maxTime.Month() == 12 {
maxTime = time.Date(maxTime.Year()+1, 1, 1, 0, 0, 0, 0, maxTime.Location())
} else {
maxTime = time.Date(maxTime.Year(), maxTime.Month()+1, 1, 0, 0, 0, 0, maxTime.Location())
}
for current := minTime; current.Before(maxTime); {
// 1 month
next := current.AddDate(0, 1, 0)
err = process(taskContext, db, config, current, next, insertReportManager, tableName)
if err != nil {
return err
}
current = next
if insertManager.GetQueuedItemCount() > 10_000 {
insertManager.ScheduleSendBatch()
}
}
err = insertReportManager.InsertManager.Close()
if err != nil {
return err
}
slog.Info("transforming finished")
return nil
}