in cmd/clickhouse-backup/main.go [26:84]
func start(natsUrl string) error {
taskContext, cancel := util.CreateCommandContext()
defer cancel()
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
clickhousebackup.SetS3EnvForLocalRun(taskContext)
}
backuper := clickhousebackup.CreateBackuper()
if env.GetBool("DO_BACKUP") {
err := executeBackup(taskContext, backuper)
return err
}
slog.Info("started", "nats", natsUrl)
nc, err := nats.Connect(natsUrl)
if err != nil {
return fmt.Errorf("cannot connect to nats: %w", err)
}
sub, err := nc.SubscribeSync("db.backup")
if err != nil {
return fmt.Errorf("cannot subscribe to db.backup: %w", err)
}
lastBackupTime := time.Time{}
for taskContext.Err() == nil {
_, err = sub.NextMsgWithContext(taskContext)
if err != nil {
contextError := taskContext.Err()
if contextError != nil {
slog.Info("cancelled", "reason", contextError)
return nil
}
return fmt.Errorf("cannot receive message: %w", err)
}
if taskContext.Err() != nil {
return nil
}
if time.Since(lastBackupTime) < 24*time.Hour {
// do not create backups too often
slog.Info("backup request skipped", "reason", "time threshold", "lastBackupTime", lastBackupTime)
continue
}
slog.Info("backup requested")
err = executeBackup(taskContext, backuper)
if err != nil {
slog.Error("cannot backup", "error", err)
} else {
lastBackupTime = time.Now()
}
}
return nil
}