func start()

in cmd/clickhouse-backup/main.go [26:84]


func start(natsUrl string) error {
	taskContext, cancel := util.CreateCommandContext()
	defer cancel()

	if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
		clickhousebackup.SetS3EnvForLocalRun(taskContext)
	}

	backuper := clickhousebackup.CreateBackuper()

	if env.GetBool("DO_BACKUP") {
		err := executeBackup(taskContext, backuper)
		return err
	}

	slog.Info("started", "nats", natsUrl)
	nc, err := nats.Connect(natsUrl)
	if err != nil {
		return fmt.Errorf("cannot connect to nats: %w", err)
	}

	sub, err := nc.SubscribeSync("db.backup")
	if err != nil {
		return fmt.Errorf("cannot subscribe to db.backup: %w", err)
	}

	lastBackupTime := time.Time{}
	for taskContext.Err() == nil {
		_, err = sub.NextMsgWithContext(taskContext)
		if err != nil {
			contextError := taskContext.Err()
			if contextError != nil {
				slog.Info("cancelled", "reason", contextError)
				return nil
			}
			return fmt.Errorf("cannot receive message: %w", err)
		}

		if taskContext.Err() != nil {
			return nil
		}

		if time.Since(lastBackupTime) < 24*time.Hour {
			// do not create backups too often
			slog.Info("backup request skipped", "reason", "time threshold", "lastBackupTime", lastBackupTime)
			continue
		}

		slog.Info("backup requested")
		err = executeBackup(taskContext, backuper)
		if err != nil {
			slog.Error("cannot backup", "error", err)
		} else {
			lastBackupTime = time.Now()
		}
	}

	return nil
}