func transform()

in cmd/transform/transform.go [71:158]


func transform(clickHouseUrl string, idName string, tableName string) error {
	slog.Info("start transforming", "db", idName)

	split := strings.Split(idName, "_")
	dbName := idName
	if len(split) > 1 {
		dbName = split[0]
	}

	db, err := clickhouse.Open(&clickhouse.Options{
		Addr: []string{clickHouseUrl},
		Auth: clickhouse.Auth{
			Database: dbName,
		},
		DialTimeout:     time.Second,
		ConnMaxLifetime: time.Hour,
		Settings: map[string]any{
			// https://github.com/ClickHouse/ClickHouse/issues/2833
			// ZSTD 19+ is used, read/write timeout should be quite large (10 minutes)
			"send_timeout":     30_000,
			"receive_timeout":  3000,
			"max_memory_usage": 100000000000,
		},
	})
	if err != nil {
		return fmt.Errorf("cannot connect to clickhouse: %w", err)
	}

	defer util.Close(db)

	taskContext, cancel := util.CreateCommandContext()
	defer cancel()

	config := analyzer.GetAnalyzer(idName)

	config.TableName = tableName + "2"
	insertReportManager, err := analyzer.NewInsertReportManager(taskContext, db, nil, config, tableName+"2", insertWorkerCount)
	if err != nil {
		return err
	}

	insertManager := insertReportManager.InsertManager
	// we send batch in the end of each iteration
	insertManager.BatchSize = 50_000

	// the whole select result in memory - so, limit
	var minTime time.Time
	var maxTime time.Time
	// use something like (now() - toIntervalMonth(1)) to test the transformer on a fresh data
	err = db.QueryRow(taskContext, "select min(generated_time) as min, max(generated_time) as max from "+tableName).Scan(&minTime, &maxTime)
	if err != nil {
		return fmt.Errorf("cannot query min/max: %w", err)
	}

	slog.Info("time range", "start", minTime, "end", maxTime)

	// round to the start of the month
	minTime = time.Date(minTime.Year(), minTime.Month(), 1, 0, 0, 0, 0, minTime.Location())
	// round to the end of the month
	if maxTime.Month() == 12 {
		maxTime = time.Date(maxTime.Year()+1, 1, 1, 0, 0, 0, 0, maxTime.Location())
	} else {
		maxTime = time.Date(maxTime.Year(), maxTime.Month()+1, 1, 0, 0, 0, 0, maxTime.Location())
	}

	for current := minTime; current.Before(maxTime); {
		// 1 month
		next := current.AddDate(0, 1, 0)
		err = process(taskContext, db, config, current, next, insertReportManager, tableName)
		if err != nil {
			return err
		}

		current = next

		if insertManager.GetQueuedItemCount() > 10_000 {
			insertManager.ScheduleSendBatch()
		}
	}

	err = insertReportManager.InsertManager.Close()
	if err != nil {
		return err
	}

	slog.Info("transforming finished")
	return nil
}