internal/praefect/config/config.go (419 lines of code) (raw):

package config import ( "bytes" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "sort" "time" "github.com/hashicorp/yamux" "github.com/pelletier/go-toml/v2" promclient "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/errors/cfgerror" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/duration" "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) // ElectionStrategy is a Praefect primary election strategy. type ElectionStrategy string // validate validates the election strategy is a valid one. func (es ElectionStrategy) validate() error { switch es { case ElectionStrategyLocal, ElectionStrategySQL, ElectionStrategyPerRepository: return nil default: return fmt.Errorf("invalid election strategy: %q", es) } } const ( // ElectionStrategyLocal configures a single node, in-memory election strategy. ElectionStrategyLocal ElectionStrategy = "local" // ElectionStrategySQL configures an SQL based strategy that elects a primary for a virtual storage. ElectionStrategySQL ElectionStrategy = "sql" // ElectionStrategyPerRepository configures an SQL based strategy that elects different primaries per repository. ElectionStrategyPerRepository ElectionStrategy = "per_repository" minimalSyncCheckInterval = time.Minute minimalSyncRunInterval = time.Minute ) // Failover contains configuration for the mechanism that tracks healthiness of the cluster nodes. type Failover struct { // Enabled is a trigger used to check if failover is enabled or not. Enabled bool `json:"enabled" toml:"enabled,omitempty"` // ElectionStrategy is the strategy to use for electing primaries nodes. ElectionStrategy ElectionStrategy `json:"election_strategy" toml:"election_strategy,omitempty"` ErrorThresholdWindow duration.Duration `json:"error_threshold_window" toml:"error_threshold_window,omitempty"` WriteErrorThresholdCount uint32 `json:"write_error_threshold_count" toml:"write_error_threshold_count,omitempty"` ReadErrorThresholdCount uint32 `json:"read_error_threshold_count" toml:"read_error_threshold_count,omitempty"` // BootstrapInterval allows set a time duration that would be used on startup to make initial health check. // The default value is 1s. BootstrapInterval duration.Duration `json:"bootstrap_interval" toml:"bootstrap_interval,omitempty"` // MonitorInterval allows set a time duration that would be used after bootstrap is completed to execute health checks. // The default value is 3s. MonitorInterval duration.Duration `json:"monitor_interval" toml:"monitor_interval,omitempty"` } // ErrorThresholdsConfigured checks whether returns whether the errors thresholds are configured. If they // are configured but in an invalid way, an error is returned. func (f Failover) ErrorThresholdsConfigured() (bool, error) { if f.ErrorThresholdWindow == 0 && f.WriteErrorThresholdCount == 0 && f.ReadErrorThresholdCount == 0 { return false, nil } if f.ErrorThresholdWindow == 0 { return false, errors.New("threshold window not set") } if f.WriteErrorThresholdCount == 0 { return false, errors.New("write error threshold not set") } if f.ReadErrorThresholdCount == 0 { return false, errors.New("read error threshold not set") } return true, nil } // Validate returns a list of failed checks. func (f Failover) Validate() error { if !f.Enabled { // If it is not enabled we shouldn't care about provided values // as they won't be used. return nil } errs := cfgerror.New(). Append(cfgerror.IsSupportedValue(f.ElectionStrategy, ElectionStrategyLocal, ElectionStrategySQL, ElectionStrategyPerRepository), "election_strategy"). Append(cfgerror.Comparable(f.BootstrapInterval.Duration()).GreaterOrEqual(0), "bootstrap_interval"). Append(cfgerror.Comparable(f.MonitorInterval.Duration()).GreaterOrEqual(0), "monitor_interval"). Append(cfgerror.Comparable(f.ErrorThresholdWindow.Duration()).GreaterOrEqual(0), "error_threshold_window") if f.ErrorThresholdWindow == 0 && f.WriteErrorThresholdCount == 0 && f.ReadErrorThresholdCount == 0 { return errs.AsError() } if f.ErrorThresholdWindow == 0 { errs = errs.Append(cfgerror.ErrNotSet, "error_threshold_window") } if f.WriteErrorThresholdCount == 0 { errs = errs.Append(cfgerror.ErrNotSet, "write_error_threshold_count") } if f.ReadErrorThresholdCount == 0 { errs = errs.Append(cfgerror.ErrNotSet, "read_error_threshold_count") } return errs.AsError() } // BackgroundVerification contains configuration options for the repository background verification. type BackgroundVerification struct { // VerificationInterval determines the duration after a replica due for reverification. // The feature is disabled if verification interval is 0 or below. VerificationInterval duration.Duration `json:"verification_interval" toml:"verification_interval,omitempty"` // DeleteInvalidRecords controls whether the background verifier will actually delete the metadata // records that point to non-existent replicas. DeleteInvalidRecords bool `json:"delete_invalid_records" toml:"delete_invalid_records"` } // Validate runs validation on all fields and compose all found errors. func (bv BackgroundVerification) Validate() error { return cfgerror.New(). Append(cfgerror.Comparable(bv.VerificationInterval.Duration()).GreaterOrEqual(0), "verification_interval"). AsError() } // DefaultBackgroundVerificationConfig returns the default background verification configuration. func DefaultBackgroundVerificationConfig() BackgroundVerification { return BackgroundVerification{ VerificationInterval: duration.Duration(7 * 24 * time.Hour), DeleteInvalidRecords: true, } } // Reconciliation contains reconciliation specific configuration options. type Reconciliation struct { // SchedulingInterval the interval between each automatic reconciliation run. If set to 0, // automatic reconciliation is disabled. SchedulingInterval duration.Duration `json:"scheduling_interval" toml:"scheduling_interval,omitempty"` // HistogramBuckets configures the reconciliation scheduling duration histogram's buckets. HistogramBuckets []float64 `json:"histogram_buckets" toml:"histogram_buckets,omitempty"` } // Validate runs validation on all fields and compose all found errors. func (r Reconciliation) Validate() error { errs := cfgerror.New(). Append(cfgerror.Comparable(r.SchedulingInterval.Duration()).GreaterOrEqual(0), "scheduling_interval") if r.SchedulingInterval != 0 { if !sort.Float64sAreSorted(r.HistogramBuckets) { errs = errs.Append(cfgerror.ErrBadOrder, "histogram_buckets") } } return errs.AsError() } // DefaultReconciliationConfig returns the default values for reconciliation configuration. func DefaultReconciliationConfig() Reconciliation { return Reconciliation{ SchedulingInterval: 5 * duration.Duration(time.Minute), HistogramBuckets: promclient.DefBuckets, } } // Replication contains replication specific configuration options. type Replication struct { // BatchSize controls how many replication jobs to dequeue and lock // in a single call to the database. BatchSize uint `json:"batch_size" toml:"batch_size,omitempty"` // ParallelStorageProcessingWorkers is a number of workers used to process replication // events per virtual storage (how many storages would be processed in parallel). ParallelStorageProcessingWorkers uint `json:"parallel_storage_processing_workers" toml:"parallel_storage_processing_workers,omitempty"` } // Validate runs validation on all fields and compose all found errors. func (r Replication) Validate() error { return cfgerror.New(). Append(cfgerror.Comparable(r.BatchSize).GreaterOrEqual(1), "batch_size"). Append(cfgerror.Comparable(r.ParallelStorageProcessingWorkers).GreaterOrEqual(1), "parallel_storage_processing_workers"). AsError() } // DefaultReplicationConfig returns the default values for replication configuration. func DefaultReplicationConfig() Replication { return Replication{BatchSize: 10, ParallelStorageProcessingWorkers: 1} } // Config is a container for everything found in the TOML config file type Config struct { // ConfigCommand specifies the path to an executable that Praefect will run after loading the initial // configuration. The executable is expected to write JSON-formatted configuration to its standard // output that we will then deserialize and merge back into the initially-loaded configuration again. // This is an easy mechanism to generate parts of the configuration at runtime, like for example secrets. ConfigCommand string `json:"config_command" toml:"config_command,omitempty"` AllowLegacyElectors bool `json:"i_understand_my_election_strategy_is_unsupported_and_will_be_removed_without_warning" toml:"i_understand_my_election_strategy_is_unsupported_and_will_be_removed_without_warning,omitempty"` BackgroundVerification BackgroundVerification `json:"background_verification" toml:"background_verification,omitempty"` Reconciliation Reconciliation `json:"reconciliation" toml:"reconciliation,omitempty"` Replication Replication `json:"replication" toml:"replication,omitempty"` ListenAddr string `json:"listen_addr" toml:"listen_addr,omitempty"` TLSListenAddr string `json:"tls_listen_addr" toml:"tls_listen_addr,omitempty"` SocketPath string `json:"socket_path" toml:"socket_path,omitempty"` VirtualStorages []*VirtualStorage `json:"virtual_storage" toml:"virtual_storage,omitempty"` Logging log.Config `json:"logging" toml:"logging,omitempty"` Sentry sentry.Config `json:"sentry" toml:"sentry,omitempty"` PrometheusListenAddr string `json:"prometheus_listen_addr" toml:"prometheus_listen_addr,omitempty"` Prometheus prometheus.Config `json:"prometheus" toml:"prometheus,omitempty"` Auth auth.Config `json:"auth" toml:"auth,omitempty"` TLS config.TLS `json:"tls" toml:"tls,omitempty"` DB `json:"database" toml:"database,omitempty"` Failover Failover `json:"failover" toml:"failover,omitempty"` MemoryQueueEnabled bool `json:"memory_queue_enabled" toml:"memory_queue_enabled,omitempty"` GracefulStopTimeout duration.Duration `json:"graceful_stop_timeout" toml:"graceful_stop_timeout,omitempty"` RepositoriesCleanup RepositoriesCleanup `json:"repositories_cleanup" toml:"repositories_cleanup,omitempty"` Yamux Yamux `json:"yamux" toml:"yamux,omitempty"` } // Yamux contains Yamux related configuration values. type Yamux struct { // MaximumStreamWindowSizeBytes sets the maximum window size in bytes used for yamux streams. // Higher value can increase throughput at the cost of more memory usage. MaximumStreamWindowSizeBytes uint32 `json:"maximum_stream_window_size_bytes" toml:"maximum_stream_window_size_bytes,omitempty"` // AcceptBacklog sets the maximum number of stream openings in-flight before further openings // block. AcceptBacklog uint `json:"accept_backlog" toml:"accept_backlog,omitempty"` } func (cfg Yamux) validate() error { if cfg.MaximumStreamWindowSizeBytes < 262144 { // Yamux requires the stream window size to be at minimum 256KiB. return fmt.Errorf("yamux.maximum_stream_window_size_bytes must be at least 262144 but it was %d", cfg.MaximumStreamWindowSizeBytes) } if cfg.AcceptBacklog < 1 { // Yamux requires accept backlog to be at least 1 return fmt.Errorf("yamux.accept_backlog must be at least 1 but it was %d", cfg.AcceptBacklog) } return nil } // Validate runs validation on all fields and compose all found errors. func (cfg Yamux) Validate() error { return cfgerror.New(). Append(cfgerror.Comparable(cfg.MaximumStreamWindowSizeBytes).GreaterOrEqual(262144), "maximum_stream_window_size_bytes"). Append(cfgerror.Comparable(cfg.AcceptBacklog).GreaterOrEqual(1), "accept_backlog"). AsError() } // DefaultYamuxConfig returns the default Yamux configuration values. func DefaultYamuxConfig() Yamux { defaultCfg := yamux.DefaultConfig() return Yamux{ MaximumStreamWindowSizeBytes: defaultCfg.MaxStreamWindowSize, AcceptBacklog: uint(defaultCfg.AcceptBacklog), } } // DefaultLoggingConfig returns the default logging configuration. func DefaultLoggingConfig() log.Config { return log.Config{ Format: "text", Level: "info", } } // VirtualStorage represents a set of nodes for a storage type VirtualStorage struct { Name string `json:"name" toml:"name,omitempty"` Nodes []*Node `json:"node" toml:"node,omitempty"` // DefaultReplicationFactor is the replication factor set for new repositories. // A valid value is inclusive between 1 and the number of configured storages in the // virtual storage. Setting the value to 0 or below causes Praefect to not store any // host assignments, falling back to the behavior of replicating to every configured // storage DefaultReplicationFactor int `json:"default_replication_factor" toml:"default_replication_factor,omitempty"` } // Validate runs validation on all fields and compose all found errors. func (vs VirtualStorage) Validate() error { errs := cfgerror.New(). Append(cfgerror.NotBlank(vs.Name), "name"). Append(cfgerror.NotEmptySlice(vs.Nodes), "node") for i, node := range vs.Nodes { errs = errs.Append(node.Validate(), "node", fmt.Sprintf("[%d]", i)) } return errs.AsError() } // FromFile loads the config for the passed file path func FromFile(filePath string) (Config, error) { b, err := os.ReadFile(filePath) if err != nil { return Config{}, err } return FromReader(bytes.NewReader(b)) } // FromReader loads the config for the passed data stream. func FromReader(reader io.Reader) (Config, error) { conf := &Config{ BackgroundVerification: DefaultBackgroundVerificationConfig(), Reconciliation: DefaultReconciliationConfig(), Replication: DefaultReplicationConfig(), Prometheus: prometheus.DefaultConfig(), // Sets the default Failover, to be overwritten when deserializing the TOML Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository}, RepositoriesCleanup: DefaultRepositoriesCleanup(), Yamux: DefaultYamuxConfig(), Logging: DefaultLoggingConfig(), TLS: config.NewTLS(), } if err := toml.NewDecoder(reader).Decode(conf); err != nil { return Config{}, err } if conf.ConfigCommand != "" { output, err := exec.Command(conf.ConfigCommand).Output() if err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { return Config{}, fmt.Errorf("running config command: %w, stderr: %q", err, string(exitErr.Stderr)) } return Config{}, fmt.Errorf("running config command: %w", err) } if err := json.Unmarshal(output, &conf); err != nil { return Config{}, fmt.Errorf("unmarshalling generated config: %w", err) } } conf.setDefaults() return *conf, nil } var ( errDuplicateStorage = errors.New("internal gitaly storages are not unique") errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") errGitalyWithoutStorage = errors.New("all gitaly nodes must have a storage") errNoGitalyServers = errors.New("no primary gitaly backends configured") errNoListener = errors.New("no listen address or socket path configured") errNoVirtualStorages = errors.New("no virtual storages configured") errVirtualStoragesNotUnique = errors.New("virtual storages must have unique names") errVirtualStorageUnnamed = errors.New("virtual storages must have a name") ) // Validate establishes if the config is valid func (c *Config) Validate() error { if err := c.Failover.ElectionStrategy.validate(); err != nil { return err } if c.ListenAddr == "" && c.SocketPath == "" && c.TLSListenAddr == "" { return errNoListener } if len(c.VirtualStorages) == 0 { return errNoVirtualStorages } if c.Replication.BatchSize < 1 { return fmt.Errorf("replication batch size was %d but must be >=1", c.Replication.BatchSize) } virtualStorages := make(map[string]struct{}, len(c.VirtualStorages)) for _, virtualStorage := range c.VirtualStorages { if virtualStorage.Name == "" { return errVirtualStorageUnnamed } if len(virtualStorage.Nodes) == 0 { return fmt.Errorf("virtual storage %q: %w", virtualStorage.Name, errNoGitalyServers) } if _, ok := virtualStorages[virtualStorage.Name]; ok { return fmt.Errorf("virtual storage %q: %w", virtualStorage.Name, errVirtualStoragesNotUnique) } virtualStorages[virtualStorage.Name] = struct{}{} storages := make(map[string]struct{}, len(virtualStorage.Nodes)) for _, node := range virtualStorage.Nodes { if node.Storage == "" { return fmt.Errorf("virtual storage %q: %w", virtualStorage.Name, errGitalyWithoutStorage) } if node.Address == "" { return fmt.Errorf("virtual storage %q: %w", virtualStorage.Name, errGitalyWithoutAddr) } if _, found := storages[node.Storage]; found { return fmt.Errorf("virtual storage %q: %w", virtualStorage.Name, errDuplicateStorage) } storages[node.Storage] = struct{}{} } if virtualStorage.DefaultReplicationFactor > len(virtualStorage.Nodes) { return fmt.Errorf( "virtual storage %q has a default replication factor (%d) which is higher than the number of storages (%d)", virtualStorage.Name, virtualStorage.DefaultReplicationFactor, len(virtualStorage.Nodes), ) } } if c.RepositoriesCleanup.RunInterval.Duration() > 0 { if c.RepositoriesCleanup.CheckInterval.Duration() < minimalSyncCheckInterval { return fmt.Errorf("repositories_cleanup.check_interval is less then %s, which could lead to a database performance problem", minimalSyncCheckInterval.String()) } if c.RepositoriesCleanup.RunInterval.Duration() < minimalSyncRunInterval { return fmt.Errorf("repositories_cleanup.run_interval is less then %s, which could lead to a database performance problem", minimalSyncRunInterval.String()) } } if err := c.Yamux.validate(); err != nil { return err } return nil } // ValidateV2 is a new validation method that is a replacement for the existing Validate. // It exists as a demonstration of the new validation implementation based on the usage // of the cfgerror package. func (c *Config) ValidateV2() error { errs := cfgerror.New(). Append(func() error { if c.SocketPath == "" && c.ListenAddr == "" && c.TLSListenAddr == "" { return fmt.Errorf(`none of "socket_path", "listen_addr" or "tls_listen_addr" is set`) } return nil }()). Append(c.BackgroundVerification.Validate(), "background_verification"). Append(c.Reconciliation.Validate(), "reconciliation"). Append(c.Replication.Validate(), "replication"). Append(c.Prometheus.Validate(), "prometheus"). Append(c.TLS.Validate(), "tls"). Append(c.Failover.Validate(), "failover"). Append(cfgerror.Comparable(c.GracefulStopTimeout.Duration()).GreaterOrEqual(0), "graceful_stop_timeout"). Append(c.RepositoriesCleanup.Validate(), "repositories_cleanup"). Append(c.Yamux.Validate(), "yamux"). Append(cfgerror.NotEmptySlice(c.VirtualStorages), "virtual_storage") for i, storage := range c.VirtualStorages { errs = errs.Append(storage.Validate(), "virtual_storage", fmt.Sprintf("[%d]", i)) } return errs.AsError() } // NeedsSQL returns true if the driver for SQL needs to be initialized func (c *Config) NeedsSQL() bool { return !c.MemoryQueueEnabled || (c.Failover.Enabled && c.Failover.ElectionStrategy != ElectionStrategyLocal) } func (c *Config) setDefaults() { if c.GracefulStopTimeout.Duration() == 0 { c.GracefulStopTimeout = duration.Duration(time.Minute) } if c.Failover.Enabled { if c.Failover.BootstrapInterval.Duration() == 0 { c.Failover.BootstrapInterval = duration.Duration(time.Second) } if c.Failover.MonitorInterval.Duration() == 0 { c.Failover.MonitorInterval = duration.Duration(3 * time.Second) } } } // VirtualStorageNames returns names of all virtual storages configured. func (c *Config) VirtualStorageNames() []string { names := make([]string, len(c.VirtualStorages)) for i, virtual := range c.VirtualStorages { names[i] = virtual.Name } return names } // StorageNames returns storage names by virtual storage. func (c *Config) StorageNames() map[string][]string { storages := make(map[string][]string, len(c.VirtualStorages)) for _, vs := range c.VirtualStorages { nodes := make([]string, len(vs.Nodes)) for i, n := range vs.Nodes { nodes[i] = n.Storage } storages[vs.Name] = nodes } return storages } // DefaultReplicationFactors returns a map with the default replication factors of // the virtual storages. func (c Config) DefaultReplicationFactors() map[string]int { replicationFactors := make(map[string]int, len(c.VirtualStorages)) for _, vs := range c.VirtualStorages { replicationFactors[vs.Name] = vs.DefaultReplicationFactor } return replicationFactors } // DBConnection holds Postgres client configuration data. type DBConnection struct { Host string `json:"host" toml:"host,omitempty"` Port int `json:"port" toml:"port,omitempty"` User string `json:"user" toml:"user,omitempty"` Password string `json:"password" toml:"password,omitempty"` DBName string `json:"dbname" toml:"dbname,omitempty"` SSLMode string `json:"sslmode" toml:"sslmode,omitempty"` SSLCert string `json:"sslcert" toml:"sslcert,omitempty"` SSLKey string `json:"sslkey" toml:"sslkey,omitempty"` SSLRootCert string `json:"sslrootcert" toml:"sslrootcert,omitempty"` } // DB holds database configuration data. type DB struct { Host string `json:"host" toml:"host,omitempty"` Port int `json:"port" toml:"port,omitempty"` User string `json:"user" toml:"user,omitempty"` Password string `json:"password" toml:"password,omitempty"` DBName string `json:"dbname" toml:"dbname,omitempty"` SSLMode string `json:"sslmode" toml:"sslmode,omitempty"` SSLCert string `json:"sslcert" toml:"sslcert,omitempty"` SSLKey string `json:"sslkey" toml:"sslkey,omitempty"` SSLRootCert string `json:"sslrootcert" toml:"sslrootcert,omitempty"` SessionPooled DBConnection `json:"session_pooled" toml:"session_pooled,omitempty"` // The following configuration keys are deprecated and // will be removed. Use Host and Port attributes of // SessionPooled instead. HostNoProxy string `json:"host_no_proxy" toml:"host_no_proxy,omitempty"` PortNoProxy int `json:"port_no_proxy" toml:"port_no_proxy,omitempty"` } // RepositoriesCleanup configures repository synchronisation. type RepositoriesCleanup struct { // CheckInterval is a time period used to check if operation should be executed. // It is recommended to keep it less than run_interval configuration as some // nodes may be out of service, so they can be stale for too long. CheckInterval duration.Duration `json:"check_interval" toml:"check_interval,omitempty"` // RunInterval: the check runs if the previous operation was done at least RunInterval before. RunInterval duration.Duration `json:"run_interval" toml:"run_interval,omitempty"` // RepositoriesInBatch is the number of repositories to pass as a batch for processing. RepositoriesInBatch uint `json:"repositories_in_batch" toml:"repositories_in_batch,omitempty"` } // Validate runs validation on all fields and compose all found errors. func (rc RepositoriesCleanup) Validate() error { if rc.RunInterval == 0 { // If RunInterval is set to 0 it means it is disabled. The validation doesn't make // sense then as it won't be used. return nil } return cfgerror.New(). Append(cfgerror.Comparable(rc.CheckInterval.Duration()).GreaterOrEqual(minimalSyncCheckInterval), "check_interval"). Append(cfgerror.Comparable(rc.RunInterval.Duration()).GreaterOrEqual(minimalSyncRunInterval), "run_interval"). Append(cfgerror.Comparable(rc.RepositoriesInBatch).GreaterOrEqual(1), "repositories_in_batch"). AsError() } // DefaultRepositoriesCleanup contains default configuration values for the RepositoriesCleanup. func DefaultRepositoriesCleanup() RepositoriesCleanup { return RepositoriesCleanup{ CheckInterval: duration.Duration(30 * time.Minute), RunInterval: duration.Duration(24 * time.Hour), RepositoriesInBatch: 16, } }