cmd/copy-table/copy.go (123 lines of code) (raw):

package main import ( "context" "fmt" "log/slog" "os" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) func main() { sourceTable := "perfintDev.idea" targetTable := "perfintDev.idea2" err := copyTable("localhost:9000", sourceTable, targetTable) if err != nil { slog.Error("copy failed", "error", err) os.Exit(1) } } func parseTableName(fullTableName string) string { parts := strings.Split(fullTableName, ".") if len(parts) == 2 { return parts[0] } return "default" } func copyTable(clickHouseUrl string, sourceTable string, targetTable string) error { slog.Info("start copying", "source", sourceTable, "target", targetTable) sourceDb := parseTableName(sourceTable) db, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{clickHouseUrl}, Auth: clickhouse.Auth{ Database: sourceDb, }, DialTimeout: time.Second, ConnMaxLifetime: time.Hour, Settings: map[string]any{ "send_timeout": 30_000, "receive_timeout": 3000, "max_memory_usage": 100000000000, }, }) if err != nil { return fmt.Errorf("cannot connect to clickhouse: %w", err) } defer db.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Get time range for processing var minTime, maxTime time.Time query := "SELECT min(generated_time) as min, max(generated_time) as max FROM " + sourceTable err = db.QueryRow(ctx, query).Scan(&minTime, &maxTime) if err != nil { return fmt.Errorf("cannot query min/max: %w", err) } slog.Info("time range", "start", minTime, "end", maxTime) // Round to month boundaries minTime = time.Date(minTime.Year(), minTime.Month(), 1, 0, 0, 0, 0, minTime.Location()) if maxTime.Month() == 12 { maxTime = time.Date(maxTime.Year()+1, 1, 1, 0, 0, 0, 0, maxTime.Location()) } else { maxTime = time.Date(maxTime.Year(), maxTime.Month()+1, 1, 0, 0, 0, 0, maxTime.Location()) } // Process data month by month totalCopied := uint64(0) for current := minTime; current.Before(maxTime); { next := current.AddDate(0, 1, 0) copied, err := processMonthWithDailyBatches(ctx, db, sourceTable, targetTable, current, next) if err != nil { return fmt.Errorf("failed to process range %v to %v: %w", current, next, err) } totalCopied += copied current = next } slog.Info("copying finished", "total_rows_copied", totalCopied) return nil } func processMonthWithDailyBatches(ctx context.Context, db driver.Conn, sourceTable, targetTable string, startTime, endTime time.Time) (uint64, error) { slog.Info("processing month with daily batches", "start", startTime, "end", endTime) totalCopied := uint64(0) batchStart := startTime batchNum := 1 // Process in daily batches for batchStart.Before(endTime) { batchEnd := batchStart.AddDate(0, 0, 1) // Add 1 day if batchEnd.After(endTime) { batchEnd = endTime } // Check row count for this day var batchCount uint64 countQuery := fmt.Sprintf(` SELECT count(*) FROM %s WHERE generated_time >= ? AND generated_time < ? `, sourceTable) err := db.QueryRow(ctx, countQuery, batchStart, batchEnd).Scan(&batchCount) if err != nil { return totalCopied, fmt.Errorf("cannot count batch rows: %w", err) } if batchCount > 0 { slog.Info("copying daily batch", "batch_number", batchNum, "date", batchStart.Format("2006-01-02"), "rows", batchCount) // Copy this day's data insertQuery := fmt.Sprintf(` INSERT INTO %s SELECT * FROM %s WHERE generated_time >= ? AND generated_time < ? `, targetTable, sourceTable) err = db.Exec(ctx, insertQuery, batchStart, batchEnd) if err != nil { return totalCopied, fmt.Errorf("cannot execute INSERT SELECT for batch %d (date %s): %w", batchNum, batchStart.Format("2006-01-02"), err) } totalCopied += batchCount slog.Info("daily batch copied successfully", "batch_number", batchNum, "date", batchStart.Format("2006-01-02"), "rows", batchCount, "total_copied", totalCopied) } batchStart = batchEnd batchNum++ } slog.Info("month processed successfully", "total_rows", totalCopied) return totalCopied, nil }