config/config.go (325 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package config import ( "fmt" "os" "runtime" "sync/atomic" "text/template" "time" "github.com/uber/storagetapper/types" ) // AppConfigODS is the config struct which the config gets loaded into type AppConfigODS struct { LogType string `yaml:"log_type"` LogLevel string `yaml:"log_level"` ServiceName string `yaml:"serviceName"` Port int `yaml:"port"` MaxNumProcs int `yaml:"max_num_procs"` StateUpdateInterval time.Duration `yaml:"state_update_interval"` WorkerIdleInterval time.Duration `yaml:"worker_idle_interval"` LockExpireTimeout time.Duration `yaml:"lock_expire_timeout"` StateConnectURL string `yaml:"state_connect_url"` StateDBName string `yaml:"state_db_name"` StateClusterName string `yaml:"state_cluster_name"` ChangelogPipeType string `yaml:"changelog_pipe_type"` ChangelogTopicNameTemplateDefault string `yaml:"changelog_topic_name_template_default"` ChangelogTopicNameTemplate map[string]map[string]string `yaml:"changelog_topic_name_template"` ChangelogWatchdogInterval time.Duration OutputTopicNameTemplateDefault string `yaml:"output_topic_name_template_default"` OutputTopicNameTemplate map[string]map[string]string `yaml:"output_topic_name_template"` Verbose bool ChangelogBuffer bool `yaml:"changelog_buffer"` ForceMasterConnection bool `yaml:"force_master_connection"` InternalEncoding string `yaml:"internal_encoding"` TableParams `yaml:",inline"` //Merged with table specific config if any Filters map[string]map[string]RowFilter `yaml:"filters"` ConfigRefreshInterval time.Duration `yaml:"config_refresh_interval"` } // EncryptionConfig holds encryption configuration options type EncryptionConfig struct { Enabled bool PublicKey string `yaml:"public_key"` // used to encrypt in producer PrivateKey string `yaml:"private_key"` // used to decrypt in consumer SigningKey string `yaml:"signing_key"` // used to sign in producer and verify in consumer } // PipeConfig holds pipe configuration options type PipeConfig struct { MaxBatchSize int `yaml:"max_batch_size"` MaxBatchSizeBytes int `yaml:"max_batch_size_bytes"` BaseDir string `yaml:"base_dir"` MaxFileSize int64 `yaml:"max_file_size"` // file size on disk MaxFileDataSize int64 `yaml:"max_file_data_size"` //uncompressed data size Compression bool //Delimited enables producing delimited message to text files and length //prepended messages to binary files FileDelimited bool `yaml:"file_delimited"` NonBlocking bool `yaml:"non_blocking"` EndOfStreamMark bool Encryption EncryptionConfig S3 S3Config Hadoop HadoopConfig Kafka KafkaConfig SQL SQLConfig `yaml:"sql"` } // KafkaConfig holds Kafka pipe configuration type KafkaConfig struct { Addresses []string MaxMessageBytes int `yaml:"max_message_bytes"` } // S3Config holds S3 output pipe configuration type S3Config struct { Region string Endpoint string Bucket string BaseDir string `yaml:"base_dir"` AccessKeyID string `yaml:"access_key_id"` SecretAccessKey string `yaml:"secret_access_key"` SessionToken string `yaml:"session_token"` Timeout time.Duration } // HadoopConfig holds hadoop output pipe configuration type HadoopConfig struct { User string Addresses []string BaseDir string `yaml:"base_dir"` } // SQLConfig holds SQL output pipe configuration type SQLConfig struct { Type string DSN string `yaml:"dsn"` Service string Cluster string DB string } // ScheduleConfig holds snapshot schedule parameters type ScheduleConfig struct { Interval time.Duration //seconds. TODO: Implement proper duration unmarshalling // Retention time.Duration } // TableParams holds per table configuration options type TableParams struct { ClusterConcurrency int `yaml:"cluster_concurrency"` ThrottleTargetMB int64 `yaml:"throttle_target_mb"` ThrottleTargetIOPS int64 `yaml:"throttle_target_iops"` Pipe PipeConfig Schedule ScheduleConfig RowFilter RowFilter `yaml:"row_filter"` RowFilters []RowFilter `yaml:"row_filters"` // only used in table params ForceIndex string `yaml:"force_index"` NoSnapshot bool `yaml:"no_snapshot"` // Produce just insert event on update NoDeleteOnUpdate bool `yaml:"no_delete_on_update"` } // RowFilter has the condition, column name & values on which filter will be applied type RowFilter struct { Column string `yaml:"column"` Values []string `yaml:"values"` Condition string `yaml:"condition"` Operator string `yaml:"operator"` } // AppConfig is the config struct which the config gets loaded into type AppConfig struct { AppConfigODS PortDyn int ChangelogTopicNameTemplateParsed map[string]map[string]*template.Template OutputTopicNameTemplateParsed map[string]map[string]*template.Template ChangelogTopicNameTemplateDefaultParsed *template.Template OutputTopicNameTemplateDefaultParsed *template.Template } func log(format string, args ...interface{}) { _, _ = fmt.Fprintf(os.Stderr, format, args...) } func getDefaultConfig() *AppConfigODS { return &AppConfigODS{ Port: 7836, MaxNumProcs: runtime.NumCPU(), StateDBName: types.MyDBName, StateClusterName: types.MyClusterName, StateUpdateInterval: 300 * time.Second, WorkerIdleInterval: 30 * time.Second, ChangelogPipeType: "kafka", ChangelogTopicNameTemplateDefault: types.MySvcName + ".service.{{.Service}}.db.{{.DB}}.table.{{.Table}}{{if .Version}}.v{{.Version}}{{end}}", OutputTopicNameTemplateDefault: "hp-tap-{{.Service}}-{{.DB}}-{{.Table}}{{if .Version}}-v{{.Version}}{{end}}", ChangelogBuffer: true, ChangelogWatchdogInterval: 300 * time.Second, LogType: "std", LogLevel: "info", InternalEncoding: "json", TableParams: TableParams{ Pipe: PipeConfig{ BaseDir: fmt.Sprintf("/var/lib/%s", types.MySvcName), MaxFileSize: 1024 * 1024 * 1024, MaxBatchSize: 4096, MaxBatchSizeBytes: 32 * 1024 * 1024, FileDelimited: true, S3: S3Config{ Timeout: 7 * 24 * time.Hour, }, Kafka: KafkaConfig{ MaxMessageBytes: 30 * 1024 * 1024, }, }, }, ConfigRefreshInterval: 300 * time.Second, } } //Define various environments const ( EnvProduction = "production" EnvDevelopment = "development" EnvStaging = "staging" EnvTest = "test" ) type configLoader interface { environment() string load(*AppConfig) error loadSection(interface{}) error zone() string save(*AppConfig) error parseConfig(cfg *AppConfig) error } //TODO: implement cmdline and env loaders var loaders = []configLoader{&std{loadFn: stdReadFile}, &std{loadFn: mysqlRead, saveFn: mysqlWrite}} var cfg atomic.Value var updatedAt atomic.Value //Get returns current config. Reloads it if refersh interval is expired func Get() *AppConfig { c := cfg.Load() if c != nil { ac := c.(*AppConfig) updAt := updatedAt.Load() if updAt != nil && updAt.(time.Time).After(time.Now().Add(-ac.ConfigRefreshInterval)) { return ac } } err := Load() c = cfg.Load() if err != nil { log("error (re)loading config: %v\n", err.Error()) if c == nil { exit(1) return nil // exit can be remapped in tests, so it doesn't stop program execution } } else { updatedAt.Store(time.Now()) } return c.(*AppConfig) } //Set replaces current config with the contents of parameter func Set(a *AppConfigODS) error { c := &AppConfig{AppConfigODS: *a} for _, l := range loaders { err := l.parseConfig(c) if err != nil { return err } } err := parseConfig(c) if err != nil { return err } cfg.Store(c) return nil } //Load creates the config func Load() error { c := &AppConfig{AppConfigODS: *getDefaultConfig()} for _, l := range loaders { err := l.load(c) if err != nil { return err } if err = l.parseConfig(c); err != nil { return err } } if err := parseConfig(c); err != nil { return err } cfg.Store(c) return nil } //LoadSection can be used to load subsections of config files at runtime func LoadSection(cfg interface{}) error { for _, l := range loaders { err := l.loadSection(cfg) if err != nil { return err } } return nil } //Environment returns current environment func Environment() string { return loaders[0].environment() } //Save saves config to backends func Save() error { c := cfg.Load() if c == nil { return nil } for i := 0; i < len(loaders); i++ { if err := loaders[i].save(c.(*AppConfig)); err != nil { return err } updatedAt.Store(time.Now()) } return nil } // Zone returns the current zone that the application is running in func Zone() string { for _, l := range loaders { z := l.zone() if z != "" { return z } } return "" } func parseTemplates(tmplMap map[string]map[string]string) (map[string]map[string]*template.Template, error) { parsedMap := make(map[string]map[string]*template.Template) for i, inp := range tmplMap { for o, out := range inp { t, err := template.New(i + "." + o).Parse(out) if err != nil { return nil, err } if parsedMap[i] == nil { parsedMap[i] = make(map[string]*template.Template) } parsedMap[i][o] = t } } return parsedMap, nil } func parseConfig(c *AppConfig) error { t, err := parseTemplates(c.OutputTopicNameTemplate) if err != nil { return err } c.OutputTopicNameTemplateParsed = t t, err = parseTemplates(c.ChangelogTopicNameTemplate) if err != nil { return err } c.ChangelogTopicNameTemplateParsed = t td, err := template.New("otntd").Parse(c.OutputTopicNameTemplateDefault) if err != nil { return err } c.OutputTopicNameTemplateDefaultParsed = td td, err = template.New("ctntd").Parse(c.ChangelogTopicNameTemplateDefault) if err != nil { return err } c.ChangelogTopicNameTemplateDefaultParsed = td c.TableParams.Schedule.Interval *= time.Second //By default allow to pass 2 state update intervals to consider lock as stale if c.LockExpireTimeout == 0 { c.LockExpireTimeout = 2 * c.StateUpdateInterval } return nil } func sanitizeForLog(s string) string { if s != "" { return "<hidden>" } return "<empty>" } // String sanitizes config for log output func (e EncryptionConfig) String() string { return fmt.Sprintf("{Enabled:%v, PublicKey:%v, PrivateKey:%v, SigningKey:%v}", e.Enabled, sanitizeForLog(e.PublicKey), sanitizeForLog(e.PrivateKey), sanitizeForLog(e.SigningKey)) } //CopyForMerge clear all compound fields in preparation for merge by json.Unmarshal func (t *TableParams) CopyForMerge() *TableParams { tp := *t tp.Pipe.Hadoop.Addresses = nil tp.Pipe.Kafka.Addresses = nil tp.RowFilter.Values = nil return &tp } //MergeCompound restores compound fields from original structure if they were //empty in merged config func (t *TableParams) MergeCompound(r *TableParams) { if len(t.Pipe.Hadoop.Addresses) == 0 && len(r.Pipe.Hadoop.Addresses) != 0 { t.Pipe.Hadoop.Addresses = r.Pipe.Hadoop.Addresses } if len(t.Pipe.Kafka.Addresses) == 0 && len(r.Pipe.Kafka.Addresses) != 0 { t.Pipe.Kafka.Addresses = r.Pipe.Kafka.Addresses } if len(t.RowFilter.Values) == 0 && len(r.RowFilter.Values) != 0 { t.RowFilter.Values = r.RowFilter.Values } }