config/config.go (532 lines of code) (raw):
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package config
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"runtime"
"strings"
"sync"
"time"
"github.com/uber/arachne/defines"
"github.com/uber/arachne/internal/log"
"github.com/uber/arachne/internal/network"
"github.com/uber/arachne/internal/tcp"
"github.com/uber/arachne/metrics"
"github.com/google/gopacket/layers"
cli "github.com/jawher/mow.cli"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/validator.v2"
"gopkg.in/yaml.v2"
)
const defaultConfigFile = "/usr/local/etc/arachne/arachne.yaml"
// BasicConfig holds the basic parameter configurations for the application.
type BasicConfig struct {
Logging log.Config `yaml:"logging"`
Arachne ArachneConfiguration `yaml:"arachne"`
}
// ArachneConfiguration contains specific configuration to Arachne.
type ArachneConfiguration struct {
PIDPath string `yaml:"pidPath"`
Orchestrator OrchestratorConfig `yaml:"orchestrator"`
StandaloneTargetConfig string `yaml:"standaloneTargetConfig"`
}
// OrchestratorConfig contains configuration for the Arachne Orchestrator.
type OrchestratorConfig struct {
Enabled bool `yaml:"enabled"`
AddrPort string `yaml:"addrport"`
RESTVersion string `yaml:"restVersion"`
}
// Extended holds the parameter configurations implemented by outside callers.
type Extended struct {
Metrics metrics.Opt
}
// RemoteStore holds all Remotes.
type RemoteStore map[string]Remote
// Remote holds the info for every target to be echoed.
type Remote struct {
IP net.IP
AF string
Hostname string
Location string
External bool
}
type target struct {
HostName string `json:"host_name"`
IP string `json:"ip"`
Location string
}
// RemoteFileConfig needed for the JSON decoder to know which fields to expect and parse.
type RemoteFileConfig struct {
Local struct {
Location string `json:"location"`
HostName string `json:"host_name"`
SrcAddress string `json:"src_address"`
InterfaceName string `json:"interface_name"`
TargetTCPPort layers.TCPPort `json:"target_tcp_port"`
Timeout string `json:"timeout"`
BaseSrcTCPPort layers.TCPPort `json:"base_src_tcp_port"`
NumSrcTCPPorts uint16 `json:"num_src_tcp_ports"`
BatchInterval string `json:"batch_interval"`
QoSEnabled string `json:"qos"`
ResolveDNS string `json:"resolve_dns"`
DNSServersAlt string `json:"dns_servers_alternate"`
PollOrchestratorIntervalSuccess string `json:"poll_orchestrator_interval_success"`
PollOrchestratorIntervalFailure string `json:"poll_orchestrator_interval_failure"`
} `json:"local"`
Internal []target `json:"internal"`
External []target `json:"external"`
}
// AppConfig holds the info parsed from the local YAML config file.
type AppConfig struct {
Logging *zap.Config
Verbose bool
PIDPath string
Orchestrator OrchestratorConfig
StandaloneTargetConfig string
Metrics metrics.Config
}
// CLIConfig holds the info parsed from CLI.
type CLIConfig struct {
ConfigFile *string
Foreground *bool
ReceiverOnlyMode *bool
SenderOnlyMode *bool
}
// RemoteConfig holds the info parsed from the JSON config file.
type RemoteConfig struct {
Location string
HostName string
SrcAddress net.IP
SrcTCPPortRange tcp.PortRange
InterfaceName string
TargetTCPPort layers.TCPPort
Timeout time.Duration
BatchInterval time.Duration
QoSEnabled bool
ResolveDNS bool
DNSServersAlt []net.IP
PollOrchestratorInterval pollInterval
}
// pollInterval holds the polling interval info.
type pollInterval struct {
Success time.Duration
Failure time.Duration
}
// Global holds the global application info.
type Global struct {
App *AppConfig
CLI *CLIConfig
RemoteConfig *RemoteConfig
Remotes RemoteStore
}
func localFileReadable(path string) error {
if _, err := ioutil.ReadFile(path); err != nil {
return err
}
return nil
}
// ParseCliArgs provides the usage and help menu, and parses the actual arguments.
func ParseCliArgs(logger *log.Logger, service string, version string) *CLIConfig {
args := new(CLIConfig)
app := cli.App(service, "Utility to echo the DC and Cloud Infrastructure")
app.Version("v version", "Arachne "+version)
app.Spec = "[--foreground] [-c=<config_file>] [--receiver_only] [--sender_only]"
args.Foreground = app.BoolOpt("foreground", false, "Force foreground mode")
args.ConfigFile = app.StringOpt("c config_file", defaultConfigFile,
fmt.Sprintf("Agent's primary yaml configuration file (by default: %s)", defaultConfigFile))
args.ReceiverOnlyMode = app.BoolOpt("receiver_only", false, "Force TCP receiver-only mode")
args.SenderOnlyMode = app.BoolOpt("sender_only", false, "Force TCP sender-only mode")
app.Action = func() {
logger.Debug("Command line arguments parsed")
}
app.Run(os.Args)
return args
}
// Get fetches the configuration file from local path.
func Get(cc *CLIConfig, ec *Extended, logger *log.Logger) (*AppConfig, error) {
data, err := ioutil.ReadFile(*cc.ConfigFile)
if err != nil {
return nil, err
}
b, err := unmarshalBasicConfig(data, *cc.ConfigFile)
if err != nil {
return nil, err
}
mc, err := ec.Metrics.UnmarshalConfig(data, *cc.ConfigFile, logger.Logger)
if err != nil {
return nil, err
}
output := []string{"stderr"}
if b.Logging.StdOut || *cc.Foreground {
output = []string{"stdout"}
}
if b.Logging.LogSink != "" {
logger.Info("Log file path provided", zap.String("path", b.Logging.LogSink))
output = append(output, b.Logging.LogSink)
}
osHostname, _ := GetHostname(logger)
initialFields := map[string]interface{}{
"service_name": defines.ArachneService,
"hostname": osHostname,
"PID": os.Getpid(),
}
var level zapcore.Level
if err := level.Set(b.Logging.Level); err != nil {
logger.Error("Log level provided", zap.Error(err))
}
zc := zap.Config{
Level: zap.NewAtomicLevelAt(level),
Development: false,
DisableCaller: true,
EncoderConfig: zap.NewProductionEncoderConfig(),
Encoding: "json",
ErrorOutputPaths: []string{"stdout"},
OutputPaths: output,
InitialFields: initialFields,
}
cfg := AppConfig{
Logging: &zc,
PIDPath: b.Arachne.PIDPath,
Orchestrator: b.Arachne.Orchestrator,
StandaloneTargetConfig: b.Arachne.StandaloneTargetConfig,
Metrics: mc,
}
if !cfg.Orchestrator.Enabled && cfg.StandaloneTargetConfig == "" {
return nil, errors.New("the standalone-mode target configuration file has not been specified")
}
return &cfg, nil
}
// unmarshalBasicConfig fetches the configuration file from local path.
func unmarshalBasicConfig(data []byte, fname string) (*BasicConfig, error) {
cfg := new(BasicConfig)
if err := yaml.Unmarshal(data, cfg); err != nil {
return nil, errors.Wrapf(err, "error unmarshaling the configuration file %s", fname)
}
// Validate on the merged config at the end
if err := validator.Validate(cfg); err != nil {
return nil, errors.Wrapf(err, "invalid info in configuration file %s", fname)
}
return cfg, nil
}
// FetchRemoteList fetches the configuration file from local path or, remotely, from Arachne Orchestrator.
func FetchRemoteList(
gl *Global,
maxNumRemoteTargets int,
maxNumSrcTCPPorts uint16,
minBatchInterval time.Duration,
HTTPResponseHeaderTimeout time.Duration,
orchestratorRESTConf string,
kill chan struct{},
logger *log.Logger,
) error {
gl.RemoteConfig = new(RemoteConfig)
// Map of all remote targets found in JSON configuration file
remotes := make(RemoteStore, maxNumRemoteTargets)
// Standalone (non-Orchestrator) mode
if !gl.App.Orchestrator.Enabled {
logger.Debug("Orchestrator mode disabled, using static target config file.",
zap.String("path", gl.App.StandaloneTargetConfig))
if err := localFileReadable(gl.App.StandaloneTargetConfig); err != nil {
logger.Fatal("unable to retrieve local target configuration file",
zap.String("file", gl.App.StandaloneTargetConfig),
zap.Error(err))
}
logger.Info("Configuration file", zap.String("file", gl.App.StandaloneTargetConfig))
raw, err := ioutil.ReadFile(gl.App.StandaloneTargetConfig)
if err != nil {
return errors.Wrap(err, "file error")
}
if err := readRemoteList(raw, gl.RemoteConfig, remotes, maxNumSrcTCPPorts, minBatchInterval,
logger); err != nil {
logger.Fatal("error parsing default target list file",
zap.String("file", gl.App.StandaloneTargetConfig),
zap.Error(err))
}
gl.Remotes = remotes
return nil
}
// Orchestrator mode
logger.Info("Orchestrator mode enabled")
// Initial value before JSON file has been parsed
gl.RemoteConfig.PollOrchestratorInterval = pollInterval{
Success: 2 * time.Hour,
Failure: 2 * time.Minute,
}
err := refreshRemoteList(gl, remotes, maxNumSrcTCPPorts, minBatchInterval, HTTPResponseHeaderTimeout,
orchestratorRESTConf, kill, logger)
return err
}
// createHTTPClient returns an HTTP client to connect to remote server.
func createHTTPClient(timeout time.Duration, disableKeepAlives bool) *http.Client {
client := &http.Client{
Transport: &http.Transport{
ResponseHeaderTimeout: timeout,
Dial: (&net.Dialer{
Timeout: timeout,
}).Dial,
DisableKeepAlives: disableKeepAlives,
},
}
return client
}
// GetHostname returns the hostname.
func GetHostname(logger *log.Logger) (string, error) {
host, err := os.Hostname()
if err != nil {
logger.Warn("Failed to extract hostname from OS", zap.Error(err))
return "unknown", err
}
return host, nil
}
// refreshRemoteList checks with Arachne Orchestrator if new a config file should be fetched.
func refreshRemoteList(
gl *Global,
remotes RemoteStore,
maxNumSrcTCPPorts uint16,
minBatchInterval time.Duration,
HTTPResponseHeaderTimeout time.Duration,
orchestratorRESTConf string,
kill chan struct{},
logger *log.Logger,
) error {
client := createHTTPClient(HTTPResponseHeaderTimeout, true)
retryTime := gl.RemoteConfig.PollOrchestratorInterval.Failure
for {
hostname, _ := GetHostname(logger)
RESTReq := fmt.Sprintf("http://%s/%s/%s?hostname=%s",
gl.App.Orchestrator.AddrPort,
gl.App.Orchestrator.RESTVersion,
orchestratorRESTConf,
hostname)
logger.Debug("Sending HTTP request to Orchestrator", zap.String("request", RESTReq))
respCode, raw, err := fetchRemoteListFromOrchestrator(client, RESTReq, logger)
if err == nil {
switch respCode {
case http.StatusOK:
logger.Info("Target list downloaded successfully from Orchestrator",
zap.String("addrport", gl.App.Orchestrator.AddrPort))
err = readRemoteList(raw, gl.RemoteConfig, remotes, maxNumSrcTCPPorts,
minBatchInterval, logger)
if err != nil {
logger.Error("error parsing downloaded configuration file",
zap.Error(err))
goto contError
}
gl.Remotes = remotes
logger.Info("Will poll Orchestrator again later",
zap.String("retry_time",
gl.RemoteConfig.PollOrchestratorInterval.Success.String()))
return nil
case http.StatusNotFound:
retryTime = gl.RemoteConfig.PollOrchestratorInterval.Success
goto stayIdle
}
}
logger.Info("Failed to download configuration file", zap.Error(err))
contError:
if len(gl.Remotes) != 0 {
logger.Debug("last successfully fetched target list will be re-used")
return nil
}
stayIdle:
// Do not proceed until we have attempted to download the config file at least once.
logger.Info("Retrying configuration download", zap.String("retry_time", retryTime.String()))
confRetry := time.NewTicker(retryTime)
defer confRetry.Stop()
select {
case <-confRetry.C:
continue
case <-kill:
logger.Debug("Requested to exit while trying to fetch configuration file.")
return errors.New("received SIG")
}
}
}
func fetchRemoteListFromOrchestrator(
client *http.Client,
url string,
logger *log.Logger,
) (int, []byte, error) {
var bResp []byte
// Build the request
req, err := http.NewRequest("GET", url, nil)
if err != nil {
logger.Warn("NewRequest", zap.Error(err))
return 0, nil, err
}
resp, err := client.Do(req)
if err != nil {
logger.Warn("HTTP fetch failure", zap.Error(err))
return 0, nil, err
}
defer resp.Body.Close()
logger.Logger = logger.With(zap.String("status_text", http.StatusText(resp.StatusCode)),
zap.Int("status_code", resp.StatusCode))
switch resp.StatusCode {
case http.StatusOK:
logger.Debug("HTTP response status code from Orchestrator")
bResp, err = ioutil.ReadAll(resp.Body)
case http.StatusNotFound:
err = errors.New("HTTP response from Orchestrator: 'Idle mode'")
case http.StatusBadRequest:
err = errors.New("HTTP response from Orchestrator: 'Please specify hostname or DC!'")
case http.StatusInternalServerError:
logger.Warn("HTTP response from Orchestrator: Error opening requested configuration file")
default:
err = errors.New("unhandled HTTP response from Orchestrator")
}
return resp.StatusCode, bResp, err
}
func isTrue(s string) bool {
l := strings.ToLower(s)
return l == "enabled" || l == "true"
}
// readRemoteList decodes the Arachne JSON config file that includes information
// about all the hosts to be tested and validates all IP addresses.
func readRemoteList(
raw []byte,
glRC *RemoteConfig,
remotes RemoteStore,
maxNumSrcTCPPorts uint16,
minBatchInterval time.Duration,
logger *log.Logger,
) error {
c := new(RemoteFileConfig)
if err := json.Unmarshal(raw, c); err != nil {
return errors.Wrap(err, "configuration file parse error")
}
// Populate global variables
glRC.Location = strings.ToLower(c.Local.Location)
if glRC.Location == "" {
logger.Warn("Location not provided in config file")
}
glRC.HostName = strings.ToLower(c.Local.HostName)
if glRC.HostName == "" {
logger.Debug("Hostname not provided in config file")
glRC.HostName, _ = GetHostname(logger)
} else {
logger.Info("Remotely assigned hostname for metrics uploading",
zap.String("metrics_hostname", glRC.HostName))
}
glRC.InterfaceName = strings.ToLower(c.Local.InterfaceName)
switch {
case runtime.GOOS == "linux" && strings.Contains(glRC.InterfaceName, "en"),
runtime.GOOS == "darwin" && strings.Contains(glRC.InterfaceName, "eth"):
logger.Warn("Specified interface may not be applicable to actual OS",
zap.String("interface", glRC.InterfaceName),
zap.String("OS", runtime.GOOS))
}
srcIP, err := network.GetSourceAddr("ip4", strings.ToLower(c.Local.SrcAddress),
glRC.HostName, glRC.InterfaceName, logger)
if err != nil {
srcIP, err = network.GetSourceAddr("ip6", strings.ToLower(c.Local.SrcAddress),
glRC.HostName, glRC.InterfaceName, logger)
if err != nil {
return errors.Wrap(err, "could not retrieve an IPv4 or IPv6 source address")
}
}
glRC.SrcAddress = *srcIP
logger.Debug("Arachne agent's source IP address", zap.Any("address", glRC.SrcAddress))
glRC.TargetTCPPort = c.Local.TargetTCPPort
if glRC.Timeout, err = time.ParseDuration(c.Local.Timeout); err != nil {
return errors.Wrap(err, "failed to parse the timeout")
}
glRC.SrcTCPPortRange[0] = c.Local.BaseSrcTCPPort
if c.Local.NumSrcTCPPorts > maxNumSrcTCPPorts {
return errors.Errorf("not more than %d ephemeral source TCP ports may be used",
maxNumSrcTCPPorts)
}
if c.Local.NumSrcTCPPorts == 0 {
return errors.New("cannot specify zero source TCP ports")
}
glRC.SrcTCPPortRange[1] = c.Local.BaseSrcTCPPort + layers.TCPPort(c.Local.NumSrcTCPPorts) - 1
if glRC.SrcTCPPortRange.Contains(glRC.TargetTCPPort) {
return errors.Errorf("the listen TCP port cannot reside in the range of the ephemeral TCP "+
"source ports [%d-%d]", glRC.SrcTCPPortRange[0], glRC.SrcTCPPortRange[1])
}
if glRC.BatchInterval, err = time.ParseDuration(c.Local.BatchInterval); err != nil {
return errors.Wrap(err, "failed to parse the batch interval")
}
if glRC.BatchInterval < minBatchInterval {
return errors.Errorf("the batch cycle interval cannot be shorter than %v", minBatchInterval)
}
if glRC.PollOrchestratorInterval.Success, err =
time.ParseDuration(c.Local.PollOrchestratorIntervalSuccess); err != nil {
return errors.Wrap(err, "failed to parse the Orchestrator poll interval for success")
}
if glRC.PollOrchestratorInterval.Failure, err =
time.ParseDuration(c.Local.PollOrchestratorIntervalFailure); err != nil {
return errors.Wrap(err, "failed to parse the Orchestrator poll interval for failure")
}
glRC.QoSEnabled = isTrue(c.Local.QoSEnabled)
glRC.ResolveDNS = isTrue(c.Local.ResolveDNS)
DNSInput := strings.Split(c.Local.DNSServersAlt, ",")
for _, server := range DNSInput {
currDNSIP := net.ParseIP(strings.TrimSpace(server))
if currDNSIP == nil {
return errors.Errorf("configuration file parse error: "+
"invalid IP address for DNS server: %v", currDNSIP)
}
glRC.DNSServersAlt = append(glRC.DNSServersAlt, currDNSIP)
}
logger.Debug("Alternate DNS servers configured", zap.Any("servers", glRC.DNSServersAlt))
walkTargets(glRC, c.Internal, false, remotes, logger)
walkTargets(glRC, c.External, true, remotes, logger)
for key, r := range remotes {
logger.Debug("Remote", zap.String("key", key), zap.Any("object", r))
}
return nil
}
// Validate and create map of ipv4 and ipv6 addresses with string as their key.
func walkTargets(grc *RemoteConfig, ts []target, ext bool, remotes RemoteStore, logger *log.Logger) {
for _, t := range ts {
if grc.ResolveDNS && t.HostName != "" {
addrs, err := net.LookupHost(t.HostName)
if err != nil {
logger.Error("failed to DNS resolve hostname", zap.Error(err))
continue
}
t.IP = addrs[0]
}
// Validate address string
currIP := net.ParseIP(t.IP)
if currIP == nil {
logger.Error("configuration file parse error",
zap.String("err", "invalid IP address for host %s"),
zap.String("hostname", t.HostName))
}
if currIP.Equal(grc.SrcAddress) {
logger.Debug("Local server's address not added in remote target list",
zap.String("JSON_source_address", grc.SrcAddress.String()),
zap.String("target", currIP.String()))
continue
}
remotes[currIP.String()] = Remote{currIP, network.Family(&currIP),
t.HostName, t.Location, ext}
}
}
// ResolveDNSTargets resolves the DNS names of the IP addresses of all echo targets and the localhost.
func ResolveDNSTargets(
remotes RemoteStore,
grc *RemoteConfig,
DNSRefresh *time.Ticker,
wg *sync.WaitGroup,
kill chan struct{},
logger *log.Logger,
) {
go func() {
// Resolve
if localHost, err := network.ResolveIP(grc.SrcAddress.String(),
grc.DNSServersAlt, logger); err == nil {
if grc.HostName == "" {
grc.HostName = localHost
} else if grc.HostName != strings.ToLower(localHost) {
logger.Warn("DNS-resolved local hostname is different from "+
"configured local hostname",
zap.String("DNS-resolved_hostname", localHost),
zap.String("configured_hostname", grc.HostName))
}
}
for {
for addressKey := range remotes {
hostname := remotes[addressKey].Hostname
// Do not update hostname for external targets
if !remotes[addressKey].External {
//hostname = addressKey
if grc.ResolveDNS {
if h, err := network.ResolveIP(addressKey, grc.DNSServersAlt,
logger); err == nil {
hostname = h
logger.Debug("DNS resolution",
zap.String("address", addressKey),
zap.String("hostname", hostname))
}
}
}
currIP := net.ParseIP(addressKey)
remotes[addressKey] = Remote{currIP, network.Family(&currIP), hostname,
remotes[addressKey].Location, remotes[addressKey].External,
}
}
wg.Done()
select {
case <-DNSRefresh.C:
continue
case <-kill:
DNSRefresh.Stop()
logger.Debug("ResolveDNSTargets goroutine returning")
return
}
}
}()
}