agent/envoy_bootstrap/envoy_bootstrap.go (1,395 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package bootstrap
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/fs"
"io/ioutil"
"math"
"net"
"os"
"path/filepath"
"reflect"
"regexp"
"strconv"
"strings"
"time"
"github.com/aws/aws-app-mesh-agent/agent/config"
"github.com/aws/aws-app-mesh-agent/agent/envoy_bootstrap/applicationinfo"
"github.com/aws/aws-app-mesh-agent/agent/envoy_bootstrap/env"
"github.com/aws/aws-app-mesh-agent/agent/envoy_bootstrap/listenerinfo"
"github.com/aws/aws-app-mesh-agent/agent/envoy_bootstrap/mesh_resource"
"github.com/aws/aws-app-mesh-agent/agent/envoy_bootstrap/metric_filter"
"github.com/aws/aws-app-mesh-agent/agent/envoy_bootstrap/netinfo"
"github.com/aws/aws-app-mesh-agent/agent/envoy_bootstrap/platforminfo"
sdkConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-xray-sdk-go/strategy/sampling"
accesslog "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v3"
boot "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
grpc_cred "github.com/envoyproxy/go-control-plane/envoy/config/grpc_credential/v3"
metrics "github.com/envoyproxy/go-control-plane/envoy/config/metrics/v3"
trace "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3"
file_access_log "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3"
"github.com/ghodss/yaml"
log "github.com/sirupsen/logrus"
"github.com/stoewer/go-strcase"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
var (
// Matches just the port part of an v4 or v6 ip address, compiled eagerly to validate regex
ipPortRegex = regexp.MustCompile("(?:(?:.+)|(?:::.+)):([0-9]{1,5})")
)
const (
runtimeMetadataNamespace = "aws.appmesh.static_runtime"
staticResourcesKey = "staticResources"
tracingKey = "tracing"
statsConfigKey = "statsConfig"
statsSinksKey = "statsSinks"
GRPC_MAX_PINGS_WITHOUT_DATA = 0
GRPC_KEEPALIVE_TIME_MS = 10000
GRPC_KEEPALIVE_TIMEOUT_MS = 20000
listenerProtocolRegex = ".*?\\.ingress\\.((\\w+?)\\.)[0-9]+?\\.(.+?)$"
listenerPortRegex = ".*?\\.ingress\\.\\w+?\\.(([0-9]+?)\\.)(.+?)$"
envoyRdsRouteConf = ".*?\\.ingress\\.\\w+?\\.[0-9]+?\\.rds\\.((.*?)\\.)(.+?)$"
)
type EnvoyCLI interface {
run(args ...string) (string, error)
}
type envoyCLI struct {
CommandPath string
}
func (e *envoyCLI) run(args ...string) (string, error) {
return platforminfo.RunCommand(e.CommandPath, args...)
}
type FileUtil interface {
Read(path string) ([]byte, error)
Write(path string, data []byte, perm fs.FileMode) error
}
type fileUtil struct{}
func (f *fileUtil) Read(path string) ([]byte, error) {
return ioutil.ReadFile(path)
}
func (f *fileUtil) Write(path string, data []byte, perm fs.FileMode) error {
return ioutil.WriteFile(path, data, perm)
}
// This will be passed to Envoy as its initial set of runtime config.
// This is where we opt in or out of features or special hacks at startup.
func getRuntimeConfigLayer0() (map[string]interface{}, error) {
setTracingDecision, err := env.TruthyOrElse("APPMESH_SET_TRACING_DECISION", true)
if err != nil {
return nil, err
}
setNoExtensionLookupByName, err := env.TruthyOrElse("ENVOY_NO_EXTENSION_LOOKUP_BY_NAME", true)
if err != nil {
return nil, err
}
setUseHttpClientToFetchAwsCredentials, err := env.TruthyOrElse("ENVOY_USE_HTTP_CLIENT_TO_FETCH_AWS_CREDENTIALS", config.ENVOY_USE_HTTP_CLIENT_TO_FETCH_AWS_CREDENTIALS_DEFAULT)
if err != nil {
return nil, err
}
// ====== Runtime config with defaults set ======
result := map[string]interface{}{
// Allow all deprecated features to be enabled by Envoy. This prevents warnings or hard errors when
// it is sent config that is being deprecated.
"envoy.features.enable_all_deprecated_features": true,
// Allow RE2 regexes of effectively any complexity
"re2.max_program_size.error_level": 1000,
// This is a temporary hack flag to tell Envoy not to mutate
// tracing headers that it did not originate.
"envoy.reloadable_features.http_set_tracing_decision_in_request_id": setTracingDecision,
// Default is set to true.
// Envoy will create NACK if this env variable is set to true AND there is extension missing url for it
// If set to false Envoy will still lookup extension by name.
// Refer to https://www.envoyproxy.io/docs/envoy/latest/version_history/v1.22.0#minor-behavior-changes
"envoy.reloadable_features.no_extension_lookup_by_name": setNoExtensionLookupByName,
// Default is set to false.
// Envoy introduced an option to use http async client to fetch aws metadata credentials instead of using libcurl.
// This effort was to deprecated the usage of libcurl in Envoy.
// See:
// https://github.com/envoyproxy/envoy/pull/29880
// https://github.com/envoyproxy/envoy/pull/30626
// https://github.com/envoyproxy/envoy/pull/30731
// https://github.com/envoyproxy/envoy/pull/31135
"envoy.reloadable_features.use_http_client_to_fetch_aws_credentials": setUseHttpClientToFetchAwsCredentials,
}
// ====== Runtime config with no defaults set ======
// Not set by Default
// http: Add runtime flag http.max_requests_per_io_cycle for setting the limit on the number of HTTP requests processed
// from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles.
// This mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests
// from other connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3
// connections. By default this limit is disabled.
if maxRequestsPerIoCycle, err := env.OrInt("MAX_REQUESTS_PER_IO_CYCLE", -1); err != nil {
return nil, err
} else if maxRequestsPerIoCycle > 0 {
result["http.max_requests_per_io_cycle"] = maxRequestsPerIoCycle
}
return result, nil
}
func getMeshResourceFromNodeId(nodeId string) (*mesh_resource.MeshResource, error) {
// The resource name may not be a fully-formed ARN
// It is perfectly valid to pass strings or these 2 forms:
// 1. arn:aws:appmesh:...:mesh/meshName/resourceType/resourceName
// 2. mesh/meshName/resourceType/resourceName
nodeIdBits := strings.Split(nodeId, "/")
if len(nodeIdBits) < 4 {
return nil, fmt.Errorf("Unrecognized resource name format: %s", nodeId)
}
return &mesh_resource.MeshResource{
MeshName: nodeIdBits[1],
Type: nodeIdBits[2],
UpperCamelCaseType: strcase.UpperCamelCase(nodeIdBits[2]),
SnakeCaseType: strcase.SnakeCase(nodeIdBits[2]),
Name: nodeIdBits[3],
}, nil
}
func mergeDstMapInterfaceWithSrc(dst, src map[string]interface{}) (map[string]interface{}, error) {
// Create an array of keys first so that we don't iterate directly over src of type
// `map[string]interface{}` which can get modified when normalized inside the for loop.
var keys []string
for key, _ := range src {
keys = append(keys, key)
}
for _, key := range keys {
value := src[key]
keyInLowerCamelCase := strcase.LowerCamelCase(key)
if _, err := normalizeMapKeyToCamelCase(src, key); err != nil {
// Error if we fail to normalize input as it contains both snake_case & lowerCamelCase for the same key.
return nil, err
}
if oldValue, ok := dst[keyInLowerCamelCase]; ok {
// If there is an existing value for the key, then give a warning about replacing the existing value.
existing, _ := yaml.Marshal(map[string]interface{}{keyInLowerCamelCase: oldValue})
incoming, _ := yaml.Marshal(map[string]interface{}{key: value})
log.Warnf("replacing an existing %s config", keyInLowerCamelCase)
log.Warnf("==OLD==\n---\n%s", string(existing))
log.Warnf("==NEW==\n---\n%s", string(incoming))
}
dst[keyInLowerCamelCase] = value
}
return dst, nil
}
func extendDstMapInterfaceWithSrcForAKey(dst, src map[string]interface{}, key string) map[string]interface{} {
if srcValue, ok := src[key]; ok {
if dstValue, ok := dst[key]; ok {
dst[key] = reflect.AppendSlice(reflect.ValueOf(dstValue), reflect.ValueOf(srcValue)).Interface()
} else {
dst[key] = srcValue
}
}
return dst
}
func getRegion() (string, error) {
if r := env.Get("AWS_REGION"); r != "" {
return r, nil
}
log.Info("AWS_REGION environment variable is not set. Will fetch region from EC2 Metadata Service...")
cfg, err := sdkConfig.LoadDefaultConfig(context.TODO())
if err != nil {
return "", err
}
client := imds.NewFromConfig(cfg)
region, err := client.GetRegion(context.TODO(), &imds.GetRegionInput{})
if err != nil {
return "", fmt.Errorf("unable to retrieve the region from the EC2 Metadata Service %v\n", err)
}
return region.Region, nil
}
// China regions have a different domain. Refer official link:
// https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-arns.html
func getXdsDomain(region string, dualstack bool) string {
if dualstack {
xdsDomain := "api.aws"
if strings.HasPrefix(region, "cn-") {
xdsDomain = "api.amazonwebservices.com.cn"
}
return xdsDomain
} else {
xdsDomain := "amazonaws.com"
if strings.HasPrefix(region, "cn-") {
xdsDomain = "amazonaws.com.cn"
}
return xdsDomain
}
}
// Check if xDS endpoint should contain `-fips` suffix from Envoy version string.
func isFipsCompatible(region string, envoyCLIInst EnvoyCLI) (bool, error) {
version, err := envoyCLIInst.run("--version")
if err != nil {
log.Warnf("Could not determine envoy version: %v", err)
version = "unknown"
}
// Error if the image in us-gov-* or us-iso* is not FIPS compatible.
if (strings.HasPrefix(region, "us-gov-") || strings.HasPrefix(region, "us-iso")) && !strings.Contains(strings.ToLower(version), "fips") {
return false, fmt.Errorf("envoy version: %s is not FIPS compatible for region %s", version, region)
}
// Note that the FIPS image will fail to connect to xDS endpoint in regions without a FIPS endpoint.
return strings.Contains(strings.ToLower(version), "fips"), nil
}
func getRegionalXdsEndpoint(region string, envoyCLIInst EnvoyCLI) (*string, error) {
xdsEndpoint := env.Get("APPMESH_XDS_ENDPOINT")
if xdsEndpoint != "" {
return &xdsEndpoint, nil
}
preview, err := env.Truthy("APPMESH_PREVIEW")
if err != nil {
return nil, err
}
dualstack, err := env.Truthy("APPMESH_DUALSTACK_ENDPOINT")
if err != nil {
return nil, err
}
var useFips bool
if fips, err := isFipsCompatible(region, envoyCLIInst); err != nil {
return nil, err
} else {
// TODO: In GovCloud regions the AppMesh FIPS xDS endpoint does not contain the suffix `-fips`.
// In future if AppMesh adds endpoints with suffx '-fips' in GovCloud regions then get rid of
// this below logic and directly use the variable `fips`.
useFips = fips && !strings.HasPrefix(region, "us-gov-")
}
if preview && useFips {
v := fmt.Sprintf("appmesh-preview-envoy-management-fips.%s.%s:443", region, getXdsDomain(region, dualstack))
return &v, nil
}
if preview {
v := fmt.Sprintf("appmesh-preview-envoy-management.%s.%s:443", region, getXdsDomain(region, dualstack))
return &v, nil
}
if useFips {
v := fmt.Sprintf("appmesh-envoy-management-fips.%s.%s:443", region, getXdsDomain(region, dualstack))
return &v, nil
}
v := fmt.Sprintf("appmesh-envoy-management.%s.%s:443", region, getXdsDomain(region, dualstack))
return &v, nil
}
func getSigningName() (string, error) {
if v := env.Get("APPMESH_SIGNING_NAME"); v != "" {
return v, nil
}
preview, err := env.Truthy("APPMESH_PREVIEW")
if err != nil {
return "", err
}
if preview {
return "appmesh-preview", nil
}
return "appmesh", nil
}
func getNodeId() (string, error) {
// Prefer APPMESH_RESOURCE_ARN
// fallback to APPMESH_RESOURCE_NAME and APPMESH_VIRTUAL_NODE_NAME in order, for compatability
if ra := env.Get("APPMESH_RESOURCE_ARN"); ra != "" {
return ra, nil
} else if rn := env.Get("APPMESH_RESOURCE_NAME"); rn != "" {
return rn, nil
} else if vn := env.Get("APPMESH_VIRTUAL_NODE_NAME"); vn != "" {
return vn, nil
} else {
return "", errors.New("APPMESH_RESOURCE_ARN environment variable must be set.")
}
}
func getNodeCluster(nodeId string) string {
// Prefer APPMESH_RESOURCE_CLUSTER
// fallback to APPMESH_VIRTUAL_NODE_CLUSTER for compatability
// and finally to the nodeId
if v := env.Get("APPMESH_RESOURCE_CLUSTER"); v != "" {
return v
}
if v := env.Get("APPMESH_VIRTUAL_NODE_CLUSTER"); v != "" {
return v
}
return nodeId
}
func normalizeMapKeyToCamelCase(m map[string]interface{}, key string) (map[string]interface{}, error) {
// This function will normalize the input `m` map[string]interface{} to contain only the lowerCamelCase format of
// the input `key`. The input `m` map[string]interface{} can contain the key either as lowerCamelCase or as
// snake_case type. But if it contains both case type for the same `key` then this function will throw an error.
// Eg: If 'statsConfig' is the key then `m` cannot contain both 'statsConfig' & 'stats_config'.
key_in_snake_case := strcase.SnakeCase(key)
keyInLowerCamelCase := strcase.LowerCamelCase(key)
if key_in_snake_case == keyInLowerCamelCase {
// Input contains a single word where snake_case and camelCase string are the same. Eg: tracing.
return m, nil
}
if _, ok := m[key_in_snake_case]; !ok {
// Nothing to normalize.
return m, nil
}
if _, ok := m[keyInLowerCamelCase]; ok {
return nil, fmt.Errorf("the config contains both %s(lowerCamelCase) & "+
"%s(snake_case), specify only one of them\n", keyInLowerCamelCase, key_in_snake_case)
} else {
// If snake_case is used in input `m`, convert that to lowerCamelCase.
m[keyInLowerCamelCase] = m[key_in_snake_case]
delete(m, key_in_snake_case)
}
return m, nil
}
func buildTcpSocketAddr(addr string, port int, v4compat bool) *core.Address {
return &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Address: addr,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
Ipv4Compat: v4compat,
},
},
}
}
func buildUdpSocketAddr(addr string, port int) *core.Address {
return &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_UDP,
Address: addr,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
}
}
func buildSocketPipe(udsPath string) *core.Address {
return &core.Address{
Address: &core.Address_Pipe{
Pipe: &core.Pipe{
Path: udsPath,
},
},
}
}
func buildSocketPipe_WithMode(udsPath string, mode uint32) *core.Address {
return &core.Address{
Address: &core.Address_Pipe{
Pipe: &core.Pipe{
Path: udsPath,
Mode: mode,
},
},
}
}
func buildAdminAccessLogConfig() (*accesslog.AccessLog, error) {
logPath := env.Or("ENVOY_ADMIN_ACCESS_LOG_FILE", "/tmp/envoy_admin_access.log")
logPathConfig, err := anypb.New(&file_access_log.FileAccessLog{
Path: logPath,
})
if err != nil {
return nil, err
}
return &accesslog.AccessLog{
ConfigType: &accesslog.AccessLog_TypedConfig{
TypedConfig: logPathConfig,
},
}, nil
}
func buildAdmin(agentConfig config.AgentConfig) (*boot.Admin, error) {
accessLogConfig, err := buildAdminAccessLogConfig()
if err != nil {
return nil, err
}
switch agentConfig.EnvoyAdminMode {
case config.UDS:
return &boot.Admin{
AccessLog: []*accesslog.AccessLog{
accessLogConfig,
},
Address: buildSocketPipe_WithMode(config.ENVOY_ADMIN_UDS_PATH, config.ENVOY_ADMIN_UDS_FILE_MODE_DEFAULT),
}, nil
default:
port, err := env.OrInt("ENVOY_ADMIN_ACCESS_PORT", 9901)
if err != nil {
return nil, err
}
enable_ipv6, err := env.Truthy("ENVOY_ADMIN_ACCESS_ENABLE_IPV6")
if err != nil {
return nil, err
}
var addr string
if enable_ipv6 {
addr = "::"
} else {
addr = "0.0.0.0"
}
return &boot.Admin{
AccessLog: []*accesslog.AccessLog{
accessLogConfig,
},
Address: buildTcpSocketAddr(addr, port, enable_ipv6),
}, nil
}
}
func buildNode(id string, cluster string, metadata *structpb.Struct) *core.Node {
return &core.Node{
Id: id,
Cluster: cluster,
Metadata: metadata,
}
}
func generateStaticRuntimeLayer(s *structpb.Struct) *boot.RuntimeLayer {
return &boot.RuntimeLayer{
Name: "static_layer_0",
LayerSpecifier: &boot.RuntimeLayer_StaticLayer{
StaticLayer: s,
},
}
}
func generateEmptyAdminLayer() *boot.RuntimeLayer {
return &boot.RuntimeLayer{
Name: "admin_layer",
LayerSpecifier: &boot.RuntimeLayer_AdminLayer_{
AdminLayer: nil,
},
}
}
func buildLayeredRuntime() (*boot.LayeredRuntime, error) {
config, err := getRuntimeConfigLayer0()
if err != nil {
return nil, err
}
s, err := structpb.NewStruct(config)
if err != nil {
return nil, err
}
return &boot.LayeredRuntime{
Layers: []*boot.RuntimeLayer{
generateStaticRuntimeLayer(s),
generateEmptyAdminLayer(),
},
}, nil
}
func buildClusterManager() *boot.ClusterManager {
logPath := env.Or("ENVOY_OUTLIER_DETECTION_EVENT_LOG_PATH", "/dev/stdout")
return &boot.ClusterManager{
OutlierDetection: &boot.ClusterManager_OutlierDetection{
EventLogPath: logPath,
},
}
}
func buildFileDataSource(filename string) *core.DataSource {
return &core.DataSource{
Specifier: &core.DataSource_Filename{
Filename: filename,
},
}
}
func buildGoogleGrpcIntChannelArg(value int64) *core.GrpcService_GoogleGrpc_ChannelArgs_Value {
return &core.GrpcService_GoogleGrpc_ChannelArgs_Value{
ValueSpecifier: &core.GrpcService_GoogleGrpc_ChannelArgs_Value_IntValue{
IntValue: value,
},
}
}
func buildAdsGrpcServiceForRelayEndpoint(endpoint string) (*core.GrpcService, error) {
channelArgs := core.GrpcService_GoogleGrpc_ChannelArgs{
Args: map[string]*core.GrpcService_GoogleGrpc_ChannelArgs_Value{
"grpc.http2.max_pings_without_data": buildGoogleGrpcIntChannelArg(GRPC_MAX_PINGS_WITHOUT_DATA),
"grpc.keepalive_time_ms": buildGoogleGrpcIntChannelArg(GRPC_KEEPALIVE_TIME_MS),
"grpc.keepalive_timeout_ms": buildGoogleGrpcIntChannelArg(GRPC_KEEPALIVE_TIMEOUT_MS),
},
}
return &core.GrpcService{
TargetSpecifier: &core.GrpcService_GoogleGrpc_{
GoogleGrpc: &core.GrpcService_GoogleGrpc{
StatPrefix: "ads",
TargetUri: endpoint,
ChannelArgs: &channelArgs,
},
},
}, nil
}
func buildRegionalAdsGrpcService(endpoint string, region string, signingName string) (*core.GrpcService, error) {
channelArgs := core.GrpcService_GoogleGrpc_ChannelArgs{
Args: map[string]*core.GrpcService_GoogleGrpc_ChannelArgs_Value{
"grpc.http2.max_pings_without_data": buildGoogleGrpcIntChannelArg(GRPC_MAX_PINGS_WITHOUT_DATA),
"grpc.keepalive_time_ms": buildGoogleGrpcIntChannelArg(GRPC_KEEPALIVE_TIME_MS),
"grpc.keepalive_timeout_ms": buildGoogleGrpcIntChannelArg(GRPC_KEEPALIVE_TIMEOUT_MS),
},
}
iamConfig, err := anypb.New(&grpc_cred.AwsIamConfig{
ServiceName: signingName,
Region: region,
})
if err != nil {
return nil, err
}
return &core.GrpcService{
TargetSpecifier: &core.GrpcService_GoogleGrpc_{
GoogleGrpc: &core.GrpcService_GoogleGrpc{
StatPrefix: "ads",
TargetUri: endpoint,
ChannelCredentials: &core.GrpcService_GoogleGrpc_ChannelCredentials{
CredentialSpecifier: &core.GrpcService_GoogleGrpc_ChannelCredentials_SslCredentials{
SslCredentials: &core.GrpcService_GoogleGrpc_SslCredentials{
RootCerts: buildFileDataSource("/etc/pki/tls/cert.pem"),
},
},
},
CredentialsFactoryName: "envoy.grpc_credentials.aws_iam",
CallCredentials: []*core.GrpcService_GoogleGrpc_CallCredentials{
&core.GrpcService_GoogleGrpc_CallCredentials{
CredentialSpecifier: &core.GrpcService_GoogleGrpc_CallCredentials_FromPlugin{
FromPlugin: &core.GrpcService_GoogleGrpc_CallCredentials_MetadataCredentialsFromPlugin{
Name: "envoy.grpc_credentials.aws_iam",
ConfigType: &core.GrpcService_GoogleGrpc_CallCredentials_MetadataCredentialsFromPlugin_TypedConfig{
TypedConfig: iamConfig,
},
},
},
},
},
ChannelArgs: &channelArgs,
},
},
}, nil
}
// Make the config source used to point things like LDS or CDS to ADS
func buildAdsConfigSource() (*core.ConfigSource, error) {
// This timeout is in seconds
timeout, err := env.OrInt("ENVOY_INITIAL_FETCH_TIMEOUT", 0)
if err != nil {
return nil, err
}
return &core.ConfigSource{
InitialFetchTimeout: &durationpb.Duration{
Seconds: int64(timeout),
},
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
ResourceApiVersion: core.ApiVersion_V3,
}, nil
}
func buildDynamicResourcesForRelayEndpoint(endpoint string) (*boot.Bootstrap_DynamicResources, error) {
ads, err := buildAdsGrpcServiceForRelayEndpoint(endpoint)
if err != nil {
return nil, err
}
return buildDynamicResources(ads)
}
func buildRegionalDynamicResources(endpoint string, region string, signingName string) (*boot.Bootstrap_DynamicResources, error) {
ads, err := buildRegionalAdsGrpcService(endpoint, region, signingName)
if err != nil {
return nil, err
}
return buildDynamicResources(ads)
}
func buildDynamicResources(ads *core.GrpcService) (*boot.Bootstrap_DynamicResources, error) {
configSource, err := buildAdsConfigSource()
if err != nil {
return nil, err
}
dr := &boot.Bootstrap_DynamicResources{
AdsConfig: &core.ApiConfigSource{
TransportApiVersion: core.ApiVersion_V3,
ApiType: core.ApiConfigSource_GRPC,
GrpcServices: []*core.GrpcService{
ads,
},
},
LdsConfig: configSource,
CdsConfig: configSource,
}
return dr, nil
}
func getXRayAddressAndPort() (string, int, error) {
port, err := env.OrInt("XRAY_DAEMON_PORT", 2000)
if err != nil {
return "", 0, err
}
addr := "127.0.0.1"
// AWS_XRAY_DAEMON_ADDRESS may contain a port, if it does it takes priorty over XRAY_DAEMON_PORT
if v := env.Get("AWS_XRAY_DAEMON_ADDRESS"); v != "" {
// We use regex to first check that we have a port since the
// net.SplitHostPort method requires a port exist...
if len(ipPortRegex.FindSubmatchIndex([]byte(v))) != 0 {
host, p, err := net.SplitHostPort(v)
if err != nil {
return "", 0, fmt.Errorf("Could not parse AWS_XRAY_DAEMON_ADDRESS: \"%s\".", v)
}
i, err := strconv.ParseInt(p, 10, strconv.IntSize)
if err != nil {
return "", 0, fmt.Errorf("Could not parse AWS_XRAY_DAEMON_ADDRESS: \"%s\".", v)
}
port = int(i)
addr = host
} else {
addr = v
}
// The x-ray address must be a static IP right now since
// the extension does not support pointing to a cluster
if net.ParseIP(addr) == nil {
return "", 0, fmt.Errorf("AWS_XRAY_DAEMON_ADDRESS must be a static IPv4 or IPv6 address such as \"127.0.0.1\".")
}
}
return addr, port, nil
}
func getXraySamplingRuleManifest(fileUtil FileUtil) (string, error) {
// This function will try to get the file path for the xray sampling rule manifest.
// The input can either be the json file specified via XRAY_SAMPLING_RULE_MANIFEST
// or just the sampling rate specified via XRAY_SAMPLING_RATE env variable.
// For more info about the format and specification of this json file refer xray docs at
// https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-go-configuration.html#xray-sdk-go-configuration-sampling
const envSrmKey = "XRAY_SAMPLING_RULE_MANIFEST"
const envSrKey = "XRAY_SAMPLING_RATE"
const srmFile = "/tmp/sampling-rules.json"
const defaultFixedTarget = int64(1)
const defaultVersion = int(2)
const unsupportedVersion = int(1)
if v := env.Get(envSrmKey); v != "" {
// XRAY_SAMPLING_RULE_MANIFEST is given so validate the json file it points to.
if data, err := fileUtil.Read(v); err != nil {
return "", fmt.Errorf("could not read file %s=\"%s\": %w", envSrmKey, v, err)
} else if ruleManifest, err := sampling.ManifestFromJSONBytes(data); err != nil {
// ManifestFromJSONBytes method from aws xray go sdk will parse the data and validate.
return "", fmt.Errorf("validation failed for file %s=\"%s\": %w", envSrmKey, v, err)
} else if unsupportedVersion == ruleManifest.Version {
// Sampling manifest can have two possible versions (1 & 2) but envoy xray extension doesn't support version 1.
return "", fmt.Errorf("validation failed for file %s=\"%s\": version %d is not supported", envSrmKey, v, ruleManifest.Version)
} else {
log.Infof("%s is defined as %s, merging it with the x-ray tracing config.", envSrmKey, v)
return v, nil
}
} else if v := env.Get(envSrKey); v != "" {
// XRAY_SAMPLING_RULE_MANIFEST is not given but XRAY_SAMPLING_RATE is given so create the sampling-rules.json.
var fixedRate float64
var err error
// The fixed rate is a decimal between 0 and 1.00 (100%).
if fixedRate, err = strconv.ParseFloat(v, 32); err != nil || float64(0) > fixedRate || float64(1) < fixedRate {
return "", fmt.Errorf("%s environment variable (\"%s\") must be a decimal between 0 and 1.00 (100%%)", envSrKey, v)
}
// Round off to the nearest 2 decimal point precision.
fixedRate = math.Round(fixedRate*100) / 100
// If fixed rate is 0.05 (5%) then no-op
if fixedRate == 0.05 {
log.Infof("%s is defined as %s, but not creating a sampling manifest as ~0.05 is the X-Ray default", envSrKey, v)
return "", nil
}
localManifest := &sampling.RuleManifest{
Version: defaultVersion,
Default: &sampling.Rule{
Properties: &sampling.Properties{
FixedTarget: defaultFixedTarget,
Rate: fixedRate,
},
},
Rules: []*sampling.Rule{},
}
if data, err := json.Marshal(localManifest); err != nil {
return "", err
} else if err = fileUtil.Write(srmFile, data, 0644); err != nil {
return "", err
} else {
log.Infof("%s is defined as %s, localized sampling rate is set to %.2f (%d%%)", envSrKey, v, fixedRate, int(fixedRate*100))
return srmFile, nil
}
}
return "", nil
}
func appendXRayTracing(b *boot.Bootstrap, nodeId string, cluster string, fileUtil FileUtil) error {
addr, port, err := getXRayAddressAndPort()
if err != nil {
return err
}
cfg := &trace.XRayConfig{
SegmentName: cluster,
DaemonEndpoint: &core.SocketAddress{
Protocol: core.SocketAddress_UDP,
Address: addr,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
SegmentFields: &trace.XRayConfig_SegmentFields{
Origin: "AWS::AppMesh::Proxy",
},
}
if samplingRuleManifest, err := getXraySamplingRuleManifest(fileUtil); err != nil {
return err
} else if samplingRuleManifest != "" {
cfg.SamplingRuleManifest = buildFileDataSource(samplingRuleManifest)
}
res, err := getMeshResourceFromNodeId(nodeId)
if err == nil {
// By default we want the segment name to be in this format: `meshName/resourceName`.
// But defer to whatever is specified in the `XRAY_SEGMENT_NAME` env var by the user.
cfg.SegmentName = env.Or("XRAY_SEGMENT_NAME", res.MeshName+"/"+res.Name)
// If we can determine the resource name and type, we add that to the xray config as well
// TODO: Doubt x-ray supports "virtual_gateway_name" so for now we
// will always pass them "virtual_node_name". This shouldnt result in difference to a user
// as it is just presented as the "AWS::AppMesh::Proxy" name in x-ray.
aws, err := structpb.NewStruct(map[string]interface{}{
// NOTE: The config for this field *really is* just a schema-less struct in Envoy
// it is passed through to the xray daemon unmodified.
"app_mesh": map[string]interface{}{
"mesh_name": res.MeshName,
"virtual_node_name": res.Name,
},
})
if err != nil {
return err
}
cfg.SegmentFields.Aws = aws
} else {
// This is non-fatal though. We still can enable tracing without being able to parse the name
log.Warn(err)
}
packedCfg, err := anypb.New(cfg)
if err != nil {
return err
}
bt := &boot.Bootstrap{
Tracing: &trace.Tracing{
Http: &trace.Tracing_Http{
Name: "envoy.tracers.xray",
ConfigType: &trace.Tracing_Http_TypedConfig{
TypedConfig: packedCfg,
},
},
},
}
return mergeBootstrap(b, bt)
}
func appendDataDogTracing(b *boot.Bootstrap, nodeId string) error {
port, err := env.OrInt("DATADOG_TRACER_PORT", 8126)
if err != nil {
return err
}
addr := env.Or("DATADOG_TRACER_ADDRESS", "127.0.0.1")
// Generate a name for the Envoy segment of the trace
// similarly to how we generate it for X-Ray: meshName/resourceName
// But defer to whatever is specified in the DD_SERVICE env var
serviceName := "envoy"
res, err := getMeshResourceFromNodeId(nodeId)
if err == nil {
serviceName = "envoy-" + res.MeshName + "/" + res.Name
}
packedCfg, err := anypb.New(&trace.DatadogConfig{
CollectorCluster: "datadog_agent",
ServiceName: env.Or("DD_SERVICE", serviceName),
})
if err != nil {
return err
}
bt := &boot.Bootstrap{
Tracing: &trace.Tracing{
Http: &trace.Tracing_Http{
Name: "envoy.tracers.datadog",
ConfigType: &trace.Tracing_Http_TypedConfig{
TypedConfig: packedCfg,
},
},
},
StaticResources: &boot.Bootstrap_StaticResources{
Clusters: []*cluster.Cluster{
&cluster.Cluster{
Name: "datadog_agent",
ConnectTimeout: &durationpb.Duration{
Seconds: 1,
},
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_STRICT_DNS,
},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
LoadAssignment: &endpoint.ClusterLoadAssignment{
ClusterName: "datadog_agent",
Endpoints: []*endpoint.LocalityLbEndpoints{
&endpoint.LocalityLbEndpoints{
LbEndpoints: []*endpoint.LbEndpoint{
&endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: buildTcpSocketAddr(addr, port, false),
},
},
},
},
},
},
},
},
},
},
}
return mergeBootstrap(b, bt)
}
func appendJaegerTracing(b *boot.Bootstrap) error {
port, err := env.OrInt("JAEGER_TRACER_PORT", 9411)
if err != nil {
return err
}
addr := env.Or("JAEGER_TRACER_ADDRESS", "127.0.0.1")
// By default, the envoy is bootstrapped to emit Zipkin traces in PROTO format.
// If this has to be overwritten for use case such as OTel collector where it
// by default looks for traces in `JSON` format then set the env variable
// `JAEGER_TRACER_VERSION` to `JSON`/`json`.
collectorEndpointVersion := trace.ZipkinConfig_HTTP_PROTO
if strings.Contains(strings.ToLower(env.Get("JAEGER_TRACER_VERSION")), "json") {
collectorEndpointVersion = trace.ZipkinConfig_HTTP_JSON
}
packedCfg, err := anypb.New(&trace.ZipkinConfig{
CollectorCluster: "jaeger",
CollectorEndpoint: "/api/v2/spans",
CollectorEndpointVersion: collectorEndpointVersion,
SharedSpanContext: wrapperspb.Bool(false),
})
if err != nil {
return err
}
bt := &boot.Bootstrap{
Tracing: &trace.Tracing{
Http: &trace.Tracing_Http{
Name: "envoy.tracers.zipkin",
ConfigType: &trace.Tracing_Http_TypedConfig{
TypedConfig: packedCfg,
},
},
},
StaticResources: &boot.Bootstrap_StaticResources{
Clusters: []*cluster.Cluster{
&cluster.Cluster{
Name: "jaeger",
ConnectTimeout: &durationpb.Duration{
Seconds: 1,
},
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_STRICT_DNS,
},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
LoadAssignment: &endpoint.ClusterLoadAssignment{
ClusterName: "jaeger",
Endpoints: []*endpoint.LocalityLbEndpoints{
&endpoint.LocalityLbEndpoints{
LbEndpoints: []*endpoint.LbEndpoint{
&endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: buildTcpSocketAddr(addr, port, false),
},
},
},
},
},
},
},
},
},
},
}
return mergeBootstrap(b, bt)
}
func appendSdsSocketCluster(b *boot.Bootstrap, socketPath string) error {
bt := &boot.Bootstrap{
StaticResources: &boot.Bootstrap_StaticResources{
Clusters: []*cluster.Cluster{
&cluster.Cluster{
Name: "static_cluster_sds_unix_socket",
ConnectTimeout: &durationpb.Duration{
Seconds: 1,
},
Http2ProtocolOptions: &core.Http2ProtocolOptions{},
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_STATIC,
},
LoadAssignment: &endpoint.ClusterLoadAssignment{
ClusterName: "static_cluster_sds_unix_socket",
Endpoints: []*endpoint.LocalityLbEndpoints{
&endpoint.LocalityLbEndpoints{
LbEndpoints: []*endpoint.LbEndpoint{
&endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_Pipe{
Pipe: &core.Pipe{
Path: socketPath,
},
},
},
},
},
},
},
},
},
},
},
},
},
}
return mergeBootstrap(b, bt)
}
func mergeBootstrap(dst *boot.Bootstrap, src *boot.Bootstrap) error {
// Tracing should not be merged
if src.Tracing != nil {
if dst.Tracing != nil {
return errors.New("Multiple tracing configurations were specified.")
}
dst.Tracing = src.Tracing
}
// Stats configs should not be merged
if src.StatsConfig != nil {
if dst.StatsConfig != nil {
return errors.New("Multiple stats configurations were specified.")
}
dst.StatsConfig = src.StatsConfig
}
dst.StatsSinks = append(dst.StatsSinks, src.StatsSinks...)
if src.StaticResources != nil {
if dst.StaticResources == nil {
dst.StaticResources = &boot.Bootstrap_StaticResources{}
}
proto.Merge(dst.StaticResources, src.StaticResources)
}
return nil
}
func appendTracing(b *boot.Bootstrap, nodeId string, cluster string, fileUtil FileUtil) error {
xr, err := env.Truthy("ENABLE_ENVOY_XRAY_TRACING")
if err != nil {
return err
}
dd, err := env.Truthy("ENABLE_ENVOY_DATADOG_TRACING")
if err != nil {
return err
}
jg, err := env.Truthy("ENABLE_ENVOY_JAEGER_TRACING")
if err != nil {
return err
}
if (xr && dd) || (xr && jg) || (dd && jg) {
return errors.New("Only a single envoy trace driver can be configured; please enable only one of ENABLE_ENVOY_XRAY_TRACING, ENABLE_ENVOY_DATADOG_TRACING or ENABLE_ENVOY_JAEGER_TRACING.")
}
if xr {
return appendXRayTracing(b, nodeId, cluster, fileUtil)
} else if dd {
return appendDataDogTracing(b, nodeId)
} else if jg {
return appendJaegerTracing(b)
}
return nil
}
// appendPortProtocolStatPrefixTagRegex TFN is changing the stat_prefix from ingress to ingress.<protocol>.<port>.
// Thus, we are adding two new tags through
// TagSpecifier, named appmesh_listener_protocol for protocol and appmesh_listener_port for port
func appendPortProtocolStatPrefixTagRegex(tags *[]*metrics.TagSpecifier) {
*tags = append(*tags, &metrics.TagSpecifier{
TagName: "appmesh.listener_protocol",
TagValue: &metrics.TagSpecifier_Regex{
Regex: listenerProtocolRegex,
},
})
*tags = append(*tags, &metrics.TagSpecifier{
TagName: "appmesh.listener_port",
TagValue: &metrics.TagSpecifier_Regex{
Regex: listenerPortRegex,
},
})
*tags = append(*tags, &metrics.TagSpecifier{
TagName: "envoy_rds_route_config",
TagValue: &metrics.TagSpecifier_Regex{
Regex: envoyRdsRouteConf,
},
})
}
func appendStats(b *boot.Bootstrap, nodeId string) error {
tags := make([]*metrics.TagSpecifier, 0)
// Now that we support Service Connect, the nodeId can be of totally different arn format.
// example arn for ECS TaskSet - arn:aws:ecs:us-west-2:123456789012:task-set/MyCluster/MyService/ecs-svc/1234567890123456789
if strings.Contains(nodeId, ":task-set/") {
metric_filter.AppendStatsTagRegexForServiceConnect(&tags)
} else {
res, err := getMeshResourceFromNodeId(nodeId)
if err != nil {
return err
}
enableStatsTags, err := env.Truthy("ENABLE_ENVOY_STATS_TAGS")
if err != nil {
return err
}
if enableStatsTags {
tags = append(tags, &metrics.TagSpecifier{
TagName: "appmesh.mesh",
TagValue: &metrics.TagSpecifier_FixedValue{
FixedValue: res.MeshName,
},
})
tags = append(tags, &metrics.TagSpecifier{
TagName: "appmesh." + res.SnakeCaseType,
TagValue: &metrics.TagSpecifier_FixedValue{
FixedValue: res.Name,
},
})
}
metric_filter.AppendStatsTagRegexForAppMesh(&tags, res)
appendPortProtocolStatPrefixTagRegex(&tags)
}
// If there are no tags, then just bail out
if len(tags) == 0 {
return nil
}
bt := &boot.Bootstrap{
StatsConfig: &metrics.StatsConfig{
StatsTags: tags,
},
}
return mergeBootstrap(b, bt)
}
func appendStatsFlushInterval(b *boot.Bootstrap, interval string) error {
d, err := time.ParseDuration(interval)
if err != nil {
return err
}
pbd := durationpb.New(d)
if err := pbd.CheckValid(); err != nil {
return err
}
b.StatsFlushInterval = pbd
return nil
}
func appendDogStatsDSinks(b *boot.Bootstrap) error {
var packedCfg *anypb.Any
var err error
if udsPath := env.Get("STATSD_SOCKET_PATH"); udsPath != "" {
packedCfg, err = anypb.New(&metrics.DogStatsdSink{
DogStatsdSpecifier: &metrics.DogStatsdSink_Address{
Address: buildSocketPipe(udsPath),
},
})
if err != nil {
return err
}
} else {
addr := env.Or("STATSD_ADDRESS", "127.0.0.1")
port, err := env.OrInt("STATSD_PORT", 8125)
if err != nil {
return err
}
packedCfg, err = anypb.New(&metrics.DogStatsdSink{
DogStatsdSpecifier: &metrics.DogStatsdSink_Address{
Address: buildUdpSocketAddr(addr, port),
},
})
if err != nil {
return err
}
}
bt := &boot.Bootstrap{
StatsSinks: []*metrics.StatsSink{
&metrics.StatsSink{
Name: "envoy.stat_sinks.dog_statsd",
ConfigType: &metrics.StatsSink_TypedConfig{
TypedConfig: packedCfg,
},
},
},
}
return mergeBootstrap(b, bt)
}
func mergeStaticResourcesMaps(dst, src map[string]interface{}) (map[string]interface{}, error) {
if _, ok := src[staticResourcesKey].(map[string]interface{}); !ok {
// Just return dst if src is empty and there is nothing to merge
return dst, nil
}
srcSr := src[staticResourcesKey].(map[string]interface{})
if _, ok := dst[staticResourcesKey].(map[string]interface{}); !ok {
// Just set src staticResources as dst staticResources if dst staticResources is empty
dst[staticResourcesKey] = srcSr
return dst, nil
}
dstSr := dst[staticResourcesKey].(map[string]interface{})
// Merge the config.bootstrap.v3.Bootstrap.StaticResources as given in the envoyproxy docs at
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/bootstrap/v3/bootstrap.proto#config-bootstrap-v3-bootstrap-staticresources
for _, key := range [...]string{"listeners", "clusters", "secrets"} {
dstSr = extendDstMapInterfaceWithSrcForAKey(dstSr, srcSr, key)
}
dst[staticResourcesKey] = dstSr
return dst, nil
}
func mergeCustomConfigMaps(dst, src map[string]interface{}) (map[string]interface{}, error) {
// Tracing should not be merged also no need to case sanitize for tracing as it is a single word.
if srcTracing, ok := src[tracingKey]; ok {
if _, ok := dst[tracingKey]; ok {
return nil, fmt.Errorf("multiple tracing configurations were specified")
}
dst[tracingKey] = srcTracing
log.Info("Tracing config merged")
}
// Stats configs should not be merged
src, err := normalizeMapKeyToCamelCase(src, statsConfigKey)
if err != nil {
return nil, err
}
if srcStats, ok := src[statsConfigKey]; ok {
if _, ok := dst[statsConfigKey]; ok {
return nil, fmt.Errorf("multiple stats configurations were specified")
}
dst[statsConfigKey] = srcStats
log.Info("Stats config merged")
}
// Stats sinks are appended
src, err = normalizeMapKeyToCamelCase(src, statsSinksKey)
if err != nil {
return nil, err
}
dst = extendDstMapInterfaceWithSrcForAKey(dst, src, statsSinksKey)
// The bootstrap static resources could be part of any config supplied, so merge for all.
// Expect Static Resources to be provided either via staticResources (lowerCamelCase) key or via
// static_resources (snake_case) key. If both are provided then below func call will error out.
src, err = normalizeMapKeyToCamelCase(src, staticResourcesKey)
if err != nil {
return nil, err
}
dst, err = mergeStaticResourcesMaps(dst, src)
if err != nil {
return nil, err
}
// Merge rest of the bootstrap configurations except for the resource which are explicitly merged above
// in the previous steps. All the bootstrap resources list can be found in the envoy docs at
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/bootstrap/v3/bootstrap.proto#config-bootstrap-v3-bootstrap
for _, key := range [...]string{tracingKey, statsConfigKey, statsSinksKey, staticResourcesKey} {
delete(src, key)
}
if dst, err = mergeDstMapInterfaceWithSrc(dst, src); err != nil {
return nil, err
}
return dst, nil
}
func mergeCustomConfigs(bootYaml []byte, fileUtil FileUtil) ([]byte, error) {
// Merge any custom configs specified via env variable without any protobuf validations
bootConfig := make(map[string]interface{})
if bootYaml != nil {
if err := yaml.Unmarshal(bootYaml, &bootConfig); err != nil {
return nil, err
}
}
envVariables := [...]string{
"ENVOY_TRACING_CFG_FILE", // tracing
"ENVOY_STATS_CONFIG_FILE", // stats
"ENVOY_STATS_SINKS_CFG_FILE", // stats sinks
}
for _, envVar := range envVariables {
if v := env.Get(envVar); v != "" {
log.Infof("%s is defined as %s, merging it with the envoy config.", envVar, v)
data, err := fileUtil.Read(v)
if err != nil {
return nil, err
}
var customConfig map[string]interface{}
if err := yaml.Unmarshal(data, &customConfig); err != nil {
return nil, fmt.Errorf("failure to parse %s %s for the reason: %w", envVar, v, err)
}
if len(customConfig) == 0 {
return nil, fmt.Errorf("Specified %s %s is empty.", envVar, v)
}
// We have found some users putting unrelated configs in these files, so we
// best treat it like generic envoy config.
bootConfig, err = mergeCustomConfigMaps(bootConfig, customConfig)
if err != nil {
return nil, fmt.Errorf("failure to merge %s %s into envoy config for the reason: %w", envVar, v, err)
}
// Any special handling are done in below switch statement
switch envVar {
case "ENVOY_STATS_SINKS_CFG_FILE":
// TODO: Does it make sense to always error here?
if _, ok := bootConfig[statsSinksKey]; !ok {
return nil, fmt.Errorf("%s was specified as %s, but contained no valid stats_sinks configuration.", envVar, v)
}
}
}
}
// Check if explicit resources are supplied via ENVOY_RESOURCES_CONFIG_FILE
// This block doesn't have to be inside the above 'for' loop since we treat missing config differently. But
// TODO: if you feel like this can be inside the 'for' loop feel free move it in: always 'less code' ==> 'less bugs'.
envVar := "ENVOY_RESOURCES_CONFIG_FILE"
if v := env.Get(envVar); v != "" {
log.Infof("Explicitly appending supplied Envoy resources %s %s.", envVar, v)
data, err := fileUtil.Read(v)
if err != nil {
return nil, err
}
var resourcesConfig map[string]interface{}
if err := yaml.Unmarshal(data, &resourcesConfig); err != nil {
return nil, fmt.Errorf("failure to parse %s %s for the reason: %w", envVar, v, err)
}
if len(resourcesConfig) == 0 {
log.Warnf("Specified %s %s is empty, but will ignore and continue.", envVar, v)
} else {
bootConfig, err = mergeCustomConfigMaps(bootConfig, resourcesConfig)
if err != nil {
return nil, fmt.Errorf("failure to merge %s %s into envoy config for the reason: %w", envVar, v, err)
}
}
}
bootYaml, err := yaml.Marshal(bootConfig)
if err != nil {
return nil, err
}
return bootYaml, nil
}
func convertToYAML(b *boot.Bootstrap, fileUtil FileUtil) (string, error) {
j, err := protojson.Marshal(b)
if err != nil {
return "", err
}
y, err := yaml.JSONToYAML(j)
if err != nil {
return "", err
}
y, err = mergeCustomConfigs(y, fileUtil)
if err != nil {
return "", err
}
return string(y), nil
}
func buildMetadataForNode() (*structpb.Struct, error) {
metadata := make(map[string]interface{})
if runtimeConfig, err := getRuntimeConfigLayer0(); err != nil {
log.Warnf("Could not collect static runtime info: %s", err)
} else {
metadata[runtimeMetadataNamespace] = runtimeConfig
}
interfaceInfo, err := netinfo.BuildMapWithInterfaceInfo()
if err != nil {
log.Warnf("Could not collect network info: %s", err)
} else {
for k, v := range *interfaceInfo {
metadata[k] = v
}
}
metricFilter, err := metric_filter.BuildMetadata()
if err != nil {
log.Warnf("Could not determine metric filter options: %s", err)
} else {
for k, v := range *metricFilter {
metadata[k] = v
}
}
platformInfo, err := platforminfo.BuildMetadata()
if err != nil {
log.Warnf("Could not collect platform info: %s", err)
} else {
for k, v := range *platformInfo {
metadata[k] = v
}
}
containerInfo, err := applicationinfo.BuildMetadata()
if err != nil {
log.Warnf("Could not collect ecs container info: %s", err)
} else {
for k, v := range containerInfo {
metadata[k] = v
}
}
listenerInfo, err := listenerinfo.BuildMetadata()
if err != nil {
log.Warnf("Could not collect listener info: %v", err)
} else {
for k, v := range listenerInfo {
metadata[k] = v
}
}
return structpb.NewStruct(metadata)
}
func bootstrap(agentConfig config.AgentConfig, fileUtil FileUtil, envoyCLIInst EnvoyCLI) (*boot.Bootstrap, error) {
// Generate new config
id, err := getNodeId()
if err != nil {
return nil, err
}
clusterId := getNodeCluster(id)
admin, err := buildAdmin(agentConfig)
if err != nil {
return nil, err
}
var dr *boot.Bootstrap_DynamicResources
if agentConfig.XdsEndpointUdsPath != "" {
dr, err = buildDynamicResourcesForRelayEndpoint(agentConfig.XdsEndpointUdsPath)
if err != nil {
return nil, err
}
} else {
region, err := getRegion()
if err != nil {
return nil, err
}
xdsEndpoint, err := getRegionalXdsEndpoint(region, envoyCLIInst)
if err != nil || xdsEndpoint == nil {
return nil, err
}
signingName, err := getSigningName()
if err != nil {
return nil, err
}
dr, err = buildRegionalDynamicResources(*xdsEndpoint, region, signingName)
if err != nil {
return nil, err
}
}
lr, err := buildLayeredRuntime()
if err != nil {
return nil, err
}
metadata, err := buildMetadataForNode()
if err != nil {
return nil, err
}
b := &boot.Bootstrap{
Admin: admin,
Node: buildNode(id, clusterId, metadata),
LayeredRuntime: lr,
DynamicResources: dr,
ClusterManager: buildClusterManager(),
}
// Tracing
if v := env.Get("ENVOY_TRACING_CFG_FILE"); v == "" {
if err := appendTracing(b, id, clusterId, fileUtil); err != nil {
return nil, err
}
}
// Unix Domain Socket for SDS Based TLS
if v := env.Get("APPMESH_SDS_SOCKET_PATH"); v != "" {
log.Info("APPMESH_SDS_SOCKET_PATH is defined, generating static sds cluster.")
if err := appendSdsSocketCluster(b, v); err != nil {
return nil, err
}
}
// Stats
if env.Get("ENVOY_STATS_CONFIG_FILE") == "" {
if err := appendStats(b, id); err != nil {
return nil, err
}
}
// Stats Flush Interval
if v := env.Get("ENVOY_STATS_FLUSH_INTERVAL"); v != "" {
if err := appendStatsFlushInterval(b, v); err != nil {
return nil, err
}
}
// Stats Sinks
enableDogStatsd, err := env.Truthy("ENABLE_ENVOY_DOG_STATSD")
if err != nil {
return nil, err
}
if v := env.Get("ENVOY_STATS_SINKS_CFG_FILE"); v == "" && enableDogStatsd {
if err := appendDogStatsDSinks(b); err != nil {
return nil, err
}
}
return b, nil
}
func GetBootstrapYaml(agentConfig config.AgentConfig) (string, error) {
fileUtilInst := &fileUtil{}
envoyCLIInst := &envoyCLI{agentConfig.CommandPath}
b, err := bootstrap(agentConfig, fileUtilInst, envoyCLIInst)
if err != nil {
log.Errorf("Cannot generate bootstrap config. %v", err)
return "", err
}
configYaml, err := convertToYAML(b, fileUtilInst)
if err != nil {
log.Errorf("Cannot convert bootstrap config to yaml. %v", err)
return "", err
}
return configYaml, nil
}
// If the file path cannot be verified, we will set the path to nil. This prevents the parameter from being used
// and will cause the task to fail. Unless the bootstrap generation fails, this function should be a no-op
func validateEnvoyConfigPath(configPath string) error {
// Verify that configPath is a file on disk
statInfo, err := os.Lstat(configPath)
if err != nil {
return fmt.Errorf("Unable to verify %s is a valid disk file: %v", configPath, err)
}
if statInfo != nil {
mode := statInfo.Mode()
if !mode.IsRegular() || statInfo.Size() == 0 {
return fmt.Errorf("Unable to establish %s is a regular disk file", configPath)
}
}
return nil
}
func CreateBootstrapYamlFile(agentConfig config.AgentConfig) error {
statInfo, _ := os.Lstat(agentConfig.EnvoyConfigPath)
if statInfo == nil {
return fmt.Errorf("Cannot get stats info of bootstrap config file %s", agentConfig.EnvoyConfigPath)
}
//A non-empty file already contains bootstrap configuration, we will not overwrite it.
if statInfo.Size() == 0 {
envoyConfigYaml, err := GetBootstrapYaml(agentConfig)
if err != nil {
return err
}
err = os.WriteFile(agentConfig.EnvoyConfigPath, []byte(envoyConfigYaml), 0644)
if err != nil {
return fmt.Errorf("Cannot write bootstrap config to file. %v", err)
}
}
err := validateEnvoyConfigPath(agentConfig.EnvoyConfigPath)
return err
}
// Sets default values for environment variables required by relay bootstrap but aren't defined by user.
// Also exports those variables so they can be expanded in the yaml config file.
func setRelayBootstrapEnvVariables(agentConfig config.AgentConfig, envoyCLIInst EnvoyCLI) error {
if _, exists := os.LookupEnv("APPNET_RELAY_LISTENER_UDS_PATH"); !exists {
log.Infof("APPNET_RELAY_LISTENER_UDS_PATH is not set, setting default value as: %v", agentConfig.AppNetRelayListenerUdsPath)
os.Setenv("APPNET_RELAY_LISTENER_UDS_PATH", agentConfig.AppNetRelayListenerUdsPath)
}
if _, exists := os.LookupEnv("AWS_REGION"); !exists {
region, err := getRegion()
if err != nil {
return fmt.Errorf("failed to get region from the environment: %v", err)
}
log.Infof("AWS_REGION is not set, setting default value as: %v", region)
os.Setenv("AWS_REGION", region)
}
if agentConfig.AppNetManagementDomainName != "" {
os.Setenv("APPNET_MANAGEMENT_DOMAIN_NAME", agentConfig.AppNetManagementDomainName)
} else {
region, err := getRegion()
if err != nil {
return fmt.Errorf("failed to get region from the environment: %v", err)
}
var endpoint string
if fips, err := isFipsCompatible(region, envoyCLIInst); err != nil {
return fmt.Errorf("failed to verify FIPS compatibility: %v", err)
} else if fips {
endpoint = fmt.Sprintf("ecs-sc-fips.%s.%s", region, getXdsDomain(region, true))
} else {
endpoint = fmt.Sprintf("ecs-sc.%s.%s", region, getXdsDomain(region, true))
}
log.Infof("APPNET_MANAGEMENT_DOMAIN_NAME is not set, setting default value as: %v", endpoint)
os.Setenv("APPNET_MANAGEMENT_DOMAIN_NAME", endpoint)
}
if _, exists := os.LookupEnv("APPNET_MANAGEMENT_PORT"); !exists {
log.Infof("APPNET_MANAGEMENT_PORT is not set, setting default value as: %v", agentConfig.AppNetManagementPort)
os.Setenv("APPNET_MANAGEMENT_PORT", fmt.Sprint(agentConfig.AppNetManagementPort))
}
if _, exists := os.LookupEnv("RELAY_STREAM_IDLE_TIMEOUT"); !exists {
log.Infof("RELAY_STREAM_IDLE_TIMEOUT is not set, setting default value as: %v", agentConfig.RelayStreamIdleTimeout)
os.Setenv("RELAY_STREAM_IDLE_TIMEOUT", fmt.Sprint(agentConfig.RelayStreamIdleTimeout))
}
if _, exists := os.LookupEnv("RELAY_BUFFER_LIMIT_BYTES"); !exists {
log.Infof("RELAY_BUFFER_LIMIT_BYTES is not set, setting default value as: %v", agentConfig.RelayBufferLimitBytes)
os.Setenv("RELAY_BUFFER_LIMIT_BYTES", fmt.Sprint(agentConfig.RelayBufferLimitBytes))
}
if _, exists := os.LookupEnv("ENVOY_USE_HTTP_CLIENT_TO_FETCH_AWS_CREDENTIALS"); !exists {
log.Infof("ENVOY_USE_HTTP_CLIENT_TO_FETCH_AWS_CREDENTIALS is not set, setting default value as: %v", agentConfig.EnvoyUseHttpClientToFetchAwsCredentials)
os.Setenv("ENVOY_USE_HTTP_CLIENT_TO_FETCH_AWS_CREDENTIALS", fmt.Sprint(agentConfig.EnvoyUseHttpClientToFetchAwsCredentials))
}
return nil
}
func GetRelayBootstrapYaml(agentConfig config.AgentConfig, fileUtil FileUtil, envoyCLIInst EnvoyCLI) ([]byte, error) {
relayBootstrapConfigPath := filepath.Join("agent-resources", "bootstrap_configs", "relay_bootstrap.yaml")
configYaml, err := fileUtil.Read(relayBootstrapConfigPath)
if err != nil {
return nil, fmt.Errorf("Cannot read relay bootstrap config file. %v", err)
}
er := setRelayBootstrapEnvVariables(agentConfig, envoyCLIInst)
if er != nil {
return nil, fmt.Errorf("Failed to read relay bootstrap environment variables. %v", er)
}
// Replace ${var} in the config with the environment variable values
configYaml = []byte(os.ExpandEnv(string(configYaml)))
return configYaml, nil
}
func CreateRelayBootstrapYamlFile(agentConfig config.AgentConfig) error {
statInfo, _ := os.Lstat(agentConfig.EnvoyConfigPath)
if statInfo == nil {
return fmt.Errorf("Cannot get stat info of relay bootstrap config file %s", agentConfig.EnvoyConfigPath)
}
fileUtilInst := &fileUtil{}
envoyCLIInst := &envoyCLI{agentConfig.CommandPath}
envoyConfigYaml, err := GetRelayBootstrapYaml(agentConfig, fileUtilInst, envoyCLIInst)
if err != nil {
return err
}
// If there's already a non-empty file present at this location, we'll delete and recreate it.
if err := os.Remove(agentConfig.EnvoyConfigPath); err != nil && !os.IsNotExist(err) {
log.Warnf("Failed to remove existing Envoy config file at: [%s]. Overwriting with relay config. %v.", agentConfig.AgentAdminUdsPath, err)
}
er := os.WriteFile(agentConfig.EnvoyConfigPath, envoyConfigYaml, 0644)
if er != nil {
return fmt.Errorf("Cannot write relay bootstrap config to file. %v", er)
}
return nil
e := validateEnvoyConfigPath(agentConfig.EnvoyConfigPath)
return e
}