in data_migrations/controlplane/1726744316_add_span_id_to_trace/files/migrate.go [274:417]
func main() {
// Initialize the Repopulator struct
repopulator := Repopulator{
ConnectionString: os.Args[1],
}
retry := false
resume := false
if len(os.Args) > 2 && os.Args[2] == "--retry" {
retry = true
}
if len(os.Args) > 2 && os.Args[2] == "--resume" {
resume = true
}
if resume == true && retry == true {
log.Fatal("Both resume and retry flags detected. Please use one.")
}
var startTimestampStr string
var endTimestamp time.Time
var processedRanges map[string]bool
if !retry && !resume {
// Clear the contents of success log and error log if run from scratch
if err := os.WriteFile("success_log.txt", []byte{}, 0644); err != nil {
log.Fatalf("Failed to clear success log: %v", err)
}
if err := os.WriteFile("error_log.txt", []byte{}, 0644); err != nil {
log.Fatalf("Failed to clear error log: %v", err)
}
// Recreate tables and get the time when the materialized view was created
endTimestamp = repopulator.RecreateTables()
// Get the start timestamp after recreating the tables
startTimestampStr = repopulator.GetMinMaxDate("SELECT min(Timestamp) FROM cosmo.otel_traces FORMAT TabSeparated")
if startTimestampStr == "" {
log.Fatal("Failed to retrieve start timestamp from the database.")
}
log.Printf("Start timestamp: %s\n", startTimestampStr)
log.Printf("End timestamp: %s\n", repopulator.FormatDateTime(endTimestamp))
// Append the end timestamp as the first line to the success log
repopulator.AppendToFile("success_log.txt", repopulator.FormatDateTime(endTimestamp)+"\n")
} else if resume {
startTimestampStr = repopulator.GetMinMaxDate("SELECT min(Timestamp) FROM cosmo.otel_traces FORMAT TabSeparated")
// Read the end timestamp and processed ranges from success log
endTimestampStr, processedRangesRead := repopulator.ReadSuccessLog()
processedRanges = processedRangesRead
endTimestamp, _ = repopulator.ParseDateTime(endTimestampStr)
log.Println("Resume mode enabled. Using end date from success log and skipping processed ranges.")
log.Printf("Start timestamp: %s\n", startTimestampStr)
log.Printf("End timestamp: %s\n", repopulator.FormatDateTime(endTimestamp))
} else {
log.Println("Retry mode enabled. Processing dates and hours from error_log.txt.")
}
var wg sync.WaitGroup
semaphore := make(chan struct{}, ConcurrencyLimit)
if retry {
file, err := os.Open("error_log.txt")
if err != nil {
log.Fatalf("Failed to open error log: %v", err)
}
defer file.Close()
var failedJobs []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
failedJobs = append(failedJobs, line)
}
if err := scanner.Err(); err != nil {
log.Fatalf("Failed to read error log: %v", err)
}
// Clear the contents of the error_log.txt after all lines are read
if err := os.WriteFile("error_log.txt", []byte{}, 0644); err != nil {
log.Fatalf("Failed to clear error log: %v", err)
}
// Process all stored start and end times
for _, job := range failedJobs {
parts := strings.SplitN(job, " ", 5)
if len(parts) < 4 {
log.Printf("Invalid line in error log: %s\n", job)
continue
}
startTimeStr := parts[0] + " " + parts[1]
endTimeStr := parts[2] + " " + parts[3]
startTime, err := repopulator.ParseDateTime(startTimeStr)
if err != nil {
log.Printf("Failed to parse startTime in error log: %s\n", startTimeStr)
continue
}
endTime, err := repopulator.ParseDateTime(endTimeStr)
if err != nil {
log.Printf("Failed to parse endTime in error log: %s\n", endTimeStr)
continue
}
wg.Add(1)
go repopulator.ExecuteSQLCommand(startTime, endTime, &wg, semaphore)
}
} else {
// Parse startTimestamp
startTimestamp, err := repopulator.ParseDateTime(startTimestampStr)
if err != nil {
log.Fatalf("Failed to parse start timestamp: %v", err)
}
// Round start timestamp to the start of the hour
currentTime := startTimestamp.Truncate(time.Hour)
for currentTime.Before(endTimestamp) {
nextTime := currentTime.Add(time.Hour)
if nextTime.After(endTimestamp) {
nextTime = endTimestamp
}
timeRangeKey := fmt.Sprintf("%s %s", repopulator.FormatDateTime(currentTime), repopulator.FormatDateTime(nextTime))
if processedRanges != nil && processedRanges[timeRangeKey] {
// Skip already processed range
currentTime = nextTime
continue
}
wg.Add(1)
go repopulator.ExecuteSQLCommand(currentTime, nextTime, &wg, semaphore)
currentTime = nextTime
}
}
wg.Wait()
log.Println("Data repopulation completed.")
}