subscriber/config/service_config.go (176 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config
import (
"errors"
"fmt"
"sync"
"time"
"github.com/m3db/m3/src/cluster/client/etcd"
"github.com/uber-go/tally"
"github.com/uber/aresdb/client"
"github.com/uber/aresdb/utils"
cfgfx "go.uber.org/config"
"go.uber.org/fx"
"go.uber.org/zap"
)
var (
// ActiveAresNameSpace is current namespace of a list of Ares Supported in current service
ActiveAresNameSpace string
// ActiveJobNameSpace is current namespace of a list of jobs Supported in current service
ActiveJobNameSpace string
// ConfigRootPath is the root path of config
ConfigRootPath string = "config"
// ConfigFile is the file path of config file
ConfigFile string
// Module configures an HTTP server.
Module = fx.Options(
fx.Provide(
NewServiceConfig,
),
)
// SinkIsAresDB is a flag. It is true if sink is aresDB
SinkIsAresDB = false
sinkModeStr = map[string]SinkMode{
"undefined": Sink_Undefined,
"aresDB": Sink_AresDB,
"kafka": Sink_Kafka,
}
// EtcdCfgEvent is used to detect etcd cluster changes
EtcdCfgEvent = make(chan int, 1)
)
// Params defines the base objects for a service.
type Params struct {
fx.In
Environment utils.EnvironmentContext
Logger *zap.Logger
Scope tally.Scope
Config cfgfx.Provider
}
// Result defines the objects that the config module provides.
type Result struct {
fx.Out
ServiceConfig ServiceConfig
}
// ServiceConfig defines the service configuration.
type ServiceConfig struct {
Environment utils.EnvironmentContext
Logger *zap.Logger
Scope tally.Scope
Config cfgfx.Provider
Service string `yaml:"service.name"`
BackendPort int `yaml:"rest.http.address"`
AresNSConfig AresNSConfig `yaml:"ares"`
JobNSConfig JobNSConfig `yaml:"jobs"`
ActiveAresClusters map[string]SinkConfig `yaml:"-"`
ActiveJobs []string `yaml:"-"`
ControllerConfig *ControllerConfig `yaml:"controller"`
ZooKeeperConfig ZooKeeperConfig `yaml:"zookeeper"`
EtcdConfig EtcdConfig `yaml:"etcd"`
EtcdClustersConfig []EtcdClusterConfig `yaml:"etcd.etcdClusters"`
HeartbeatConfig *HeartBeatConfig `yaml:"heartbeat"`
}
// HeartBeatConfig represents heartbeat config
type HeartBeatConfig struct {
Enabled bool `yaml:"enabled"`
Timeout int `yaml:"timeout"`
Interval int `yaml:"interval"`
CheckInterval int `yaml:"checkInterval"`
}
type EtcdConfig struct {
*sync.Mutex
EtcdConfig etcd.Configuration `yaml:",inline"`
}
type EtcdClusterConfig struct {
EtcdCluster etcd.ClusterConfig `yaml:",inline"`
UNS string `yaml:"uns"`
}
// SinkMode defines the subscriber sink mode
type SinkMode int
const (
Sink_Undefined SinkMode = iota
Sink_AresDB
Sink_Kafka
)
// SinkConfig wraps sink configurations
type SinkConfig struct {
// SinkMode defines the subscriber sink mode
SinkModeStr string `yaml:"sinkMode" json:"sinkMode"`
// AresDBConnectorConfig defines aresDB client config
AresDBConnectorConfig client.ConnectorConfig `yaml:"aresDB" json:"aresDB"`
// KafkaProducerConfig defines Kafka producer config
KafkaProducerConfig KafkaProducerConfig `yaml:"kafkaProducer" json:"kafkaProducer"`
}
// KafkaProducerConfig represents Kafka producer configuration
type KafkaProducerConfig struct {
// Brokers defines a list of broker addresses separated by comma
Brokers string `yaml:"brokers" json:"brokers"`
// RetryMax is the max number of times to retry sending a message (default 3).
RetryMax int `yaml:"retryMax" json:"retryMax"`
// TimeoutInMSec is the max duration the broker will wait
// the receipt of the number of RequiredAcks (defaults to 10 seconds)
TimeoutInSec int `yaml:"timeoutInSec" json:"timeoutInSec"`
// SchemaRefreshInterval is the interval in seconds for the connector to
// fetch and refresh schema from ares
// if <= 0, will use default
SchemaRefreshInterval int `yaml:"schemaRefreshInterval" json:"schemaRefreshInterval"`
}
// AresNSConfig defines the mapping b/w ares namespace and its clusters
type AresNSConfig struct {
AresNameSpaces map[string][]string `yaml:"namespaces"`
AresClusters map[string]SinkConfig `yaml:"clusters"`
}
// JobNSConfig defines the mapping b/w job namespace and its clusters
type JobNSConfig struct {
Jobs map[string][]string `yaml:"namespaces"`
}
// ControllerConfig defines aresDB controller configuration
type ControllerConfig struct {
// Enable defines whether to enable aresDB controll or not
Enable bool `yaml:"enable" default:"false"`
// Address is aresDB controller address
Address string `yaml:"address" default:"localhost:5436"`
// Timeout is request sent to aresDB controller timeout in seconds
Timeout int `yaml:"timeout" default:"30"`
// RefreshInterval is the interval to sync up with aresDB controller in minutes
RefreshInterval int `yaml:"refreshInterval" default:"10"`
// ServiceName is aresDB controller name
ServiceName string `yaml:"serviceName" default:"ares-controller"`
}
// ZooKeeperConfig defines the ZooKeeper client configuration
type ZooKeeperConfig struct {
// Server defines zookeeper server addresses
Server string `yaml:"server"`
SessionTimeoutSeconds time.Duration `yaml:"sessionTimeoutSeconds" default:"60"`
ConnectionTimeoutSeconds time.Duration `yaml:"connectionTimeoutSeconds" default:"15"`
BaseSleepTimeSeconds time.Duration `yaml:"exponentialBackoffRetryPolicy.baseSleepTimeSeconds" default:"1"`
MaxRetries int `yaml:"exponentialBackoffRetryPolicy.maxRetries" default:"3"`
MaxSleepSeconds time.Duration `yaml:"exponentialBackoffRetryPolicy.maxSleepSeconds" default:"15"`
}
// NewServiceConfig constructs ServiceConfig.
func NewServiceConfig(p Params) (Result, error) {
raw := p.Config.Get(cfgfx.Root)
serviceConfig := ServiceConfig{}
if err := raw.Populate(&serviceConfig); err != nil {
return Result{
ServiceConfig: serviceConfig,
}, err
}
raw = p.Config.Get("etcd.etcdClusters")
if err := raw.Populate(&serviceConfig.EtcdClustersConfig); err != nil {
return Result{
ServiceConfig: serviceConfig,
}, err
}
// etcd key format: prefix/${env}/namespace/service/instanceId
etcdConfig := &serviceConfig.EtcdConfig
etcdConfig.Mutex = &sync.Mutex{}
etcdConfig.EtcdConfig.Env = fmt.Sprintf("%s/%s", etcdConfig.EtcdConfig.Env, ActiveJobNameSpace)
serviceConfig.Environment = p.Environment
serviceConfig.Logger = p.Logger
serviceConfig.Scope = p.Scope.Tagged(map[string]string{
"deployment": p.Environment.Deployment,
"dc": p.Environment.Zone,
"application": p.Environment.ApplicationID,
})
serviceConfig.Config = p.Config
serviceConfig.ActiveAresClusters = make(map[string]SinkConfig)
// set serviceConfig.ActiveAresClusters
if (serviceConfig.AresNSConfig.AresClusters == nil || serviceConfig.AresNSConfig.AresNameSpaces == nil) &&
!serviceConfig.ControllerConfig.Enable {
return Result{
ServiceConfig: serviceConfig,
}, errors.New("Ares namespaces and clusters must be configured")
}
activeAresClusters := serviceConfig.AresNSConfig.AresNameSpaces[ActiveAresNameSpace]
if activeAresClusters != nil {
for _, cluster := range activeAresClusters {
serviceConfig.ActiveAresClusters[cluster] = serviceConfig.AresNSConfig.AresClusters[cluster]
}
if len(serviceConfig.ActiveAresClusters) == 0 {
return Result{
ServiceConfig: serviceConfig,
}, fmt.Errorf("No ares cluster configure is found for namespace %s", ActiveAresNameSpace)
}
} else if !serviceConfig.ControllerConfig.Enable {
return Result{
ServiceConfig: serviceConfig,
}, fmt.Errorf("No ares clusters are defined for namespace %s", ActiveAresNameSpace)
}
// set serviceConfig.ActiveJobs
if serviceConfig.JobNSConfig.Jobs == nil && !serviceConfig.ControllerConfig.Enable {
return Result{
ServiceConfig: serviceConfig,
}, errors.New("Job namespace config not found")
}
serviceConfig.ActiveJobs = serviceConfig.JobNSConfig.Jobs[ActiveJobNameSpace]
return Result{
ServiceConfig: serviceConfig,
}, nil
}
func (s SinkConfig) GetSinkMode() SinkMode {
if val, ok := sinkModeStr[s.SinkModeStr]; ok {
return val
}
return Sink_Undefined
}