func main()

in data_migrations/controlplane/1726744316_add_span_id_to_trace/files/migrate.go [274:417]


func main() {
	// Initialize the Repopulator struct
	repopulator := Repopulator{
		ConnectionString: os.Args[1],
	}

	retry := false
	resume := false
	if len(os.Args) > 2 && os.Args[2] == "--retry" {
		retry = true
	}
	if len(os.Args) > 2 && os.Args[2] == "--resume" {
		resume = true
	}

	if resume == true && retry == true {
		log.Fatal("Both resume and retry flags detected. Please use one.")
	}

	var startTimestampStr string
	var endTimestamp time.Time
	var processedRanges map[string]bool

	if !retry && !resume {
		// Clear the contents of success log and error log if run from scratch
		if err := os.WriteFile("success_log.txt", []byte{}, 0644); err != nil {
			log.Fatalf("Failed to clear success log: %v", err)
		}
		if err := os.WriteFile("error_log.txt", []byte{}, 0644); err != nil {
			log.Fatalf("Failed to clear error log: %v", err)
		}

		// Recreate tables and get the time when the materialized view was created
		endTimestamp = repopulator.RecreateTables()

		// Get the start timestamp after recreating the tables
		startTimestampStr = repopulator.GetMinMaxDate("SELECT min(Timestamp) FROM cosmo.otel_traces FORMAT TabSeparated")

		if startTimestampStr == "" {
			log.Fatal("Failed to retrieve start timestamp from the database.")
		}

		log.Printf("Start timestamp: %s\n", startTimestampStr)
		log.Printf("End timestamp: %s\n", repopulator.FormatDateTime(endTimestamp))

		// Append the end timestamp as the first line to the success log
		repopulator.AppendToFile("success_log.txt", repopulator.FormatDateTime(endTimestamp)+"\n")
	} else if resume {
		startTimestampStr = repopulator.GetMinMaxDate("SELECT min(Timestamp) FROM cosmo.otel_traces FORMAT TabSeparated")
		// Read the end timestamp and processed ranges from success log
		endTimestampStr, processedRangesRead := repopulator.ReadSuccessLog()

		processedRanges = processedRangesRead
		endTimestamp, _ = repopulator.ParseDateTime(endTimestampStr)
		log.Println("Resume mode enabled. Using end date from success log and skipping processed ranges.")
		log.Printf("Start timestamp: %s\n", startTimestampStr)
		log.Printf("End timestamp: %s\n", repopulator.FormatDateTime(endTimestamp))
	} else {
		log.Println("Retry mode enabled. Processing dates and hours from error_log.txt.")
	}

	var wg sync.WaitGroup
	semaphore := make(chan struct{}, ConcurrencyLimit)

	if retry {
		file, err := os.Open("error_log.txt")
		if err != nil {
			log.Fatalf("Failed to open error log: %v", err)
		}
		defer file.Close()

		var failedJobs []string
		scanner := bufio.NewScanner(file)
		for scanner.Scan() {
			line := scanner.Text()
			failedJobs = append(failedJobs, line)
		}

		if err := scanner.Err(); err != nil {
			log.Fatalf("Failed to read error log: %v", err)
		}

		// Clear the contents of the error_log.txt after all lines are read
		if err := os.WriteFile("error_log.txt", []byte{}, 0644); err != nil {
			log.Fatalf("Failed to clear error log: %v", err)
		}

		// Process all stored start and end times
		for _, job := range failedJobs {
			parts := strings.SplitN(job, " ", 5)
			if len(parts) < 4 {
				log.Printf("Invalid line in error log: %s\n", job)
				continue
			}
			startTimeStr := parts[0] + " " + parts[1]
			endTimeStr := parts[2] + " " + parts[3]

			startTime, err := repopulator.ParseDateTime(startTimeStr)
			if err != nil {
				log.Printf("Failed to parse startTime in error log: %s\n", startTimeStr)
				continue
			}
			endTime, err := repopulator.ParseDateTime(endTimeStr)
			if err != nil {
				log.Printf("Failed to parse endTime in error log: %s\n", endTimeStr)
				continue
			}

			wg.Add(1)
			go repopulator.ExecuteSQLCommand(startTime, endTime, &wg, semaphore)
		}
	} else {
		// Parse startTimestamp
		startTimestamp, err := repopulator.ParseDateTime(startTimestampStr)
		if err != nil {
			log.Fatalf("Failed to parse start timestamp: %v", err)
		}

		// Round start timestamp to the start of the hour
		currentTime := startTimestamp.Truncate(time.Hour)

		for currentTime.Before(endTimestamp) {
			nextTime := currentTime.Add(time.Hour)
			if nextTime.After(endTimestamp) {
				nextTime = endTimestamp
			}

			timeRangeKey := fmt.Sprintf("%s %s", repopulator.FormatDateTime(currentTime), repopulator.FormatDateTime(nextTime))
			if processedRanges != nil && processedRanges[timeRangeKey] {
				// Skip already processed range
				currentTime = nextTime
				continue
			}

			wg.Add(1)
			go repopulator.ExecuteSQLCommand(currentTime, nextTime, &wg, semaphore)

			currentTime = nextTime
		}
	}

	wg.Wait()
	log.Println("Data repopulation completed.")
}