v2/internal/config/vars.go (213 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package config
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/rotisserie/eris"
"github.com/Azure/azure-service-operator/v2/pkg/common/annotations"
asocloud "github.com/Azure/azure-service-operator/v2/pkg/common/cloud"
"github.com/Azure/azure-service-operator/v2/pkg/common/config"
)
var DefaultMaxConcurrentReconciles = 1
// NOTE: Changes to documentation or available values here should be documented in Helm values.yaml as well
// Values stores configuration values that are set for the operator.
type Values struct {
// SubscriptionID is the Azure subscription the operator will use
// for ARM communication.
SubscriptionID string
// TenantID is the Azure tenantID the operator will use
// for ARM communication.
TenantID string
// AdditionalTenants is the set of allowed additional tenants,
// used for cross-tenant auth.
AdditionalTenants []string
// ClientID is the Azure clientID the operator will use
// for ARM communication.
ClientID string
// PodNamespace is the namespace the operator pods are running in.
PodNamespace string
// OperatorMode determines whether the operator should run
// watchers, webhooks or both.
OperatorMode OperatorMode
// TargetNamespaces lists the namespaces the operator will watch
// for Azure resources (if the mode includes running watchers). If
// it's empty the operator will watch all namespaces.
TargetNamespaces []string
// SyncPeriod is the frequency at which resources are re-reconciled with Azure
// when there have been no triggering changes in the Kubernetes resources. This sync
// exists to detect and correct changes that happened in Azure that Kubernetes is not
// aware about. BE VERY CAREFUL setting this value low - even a modest number of resources
// can cause subscription level throttling if they are re-synced frequently.
// If nil, no sync is performed. Durations are specified as "1h", "15m", or "60s". See
// https://pkg.go.dev/time#ParseDuration for more details.
//
// Specify the special value "never" for AZURE_SYNC_PERIOD to prevent syncing.
SyncPeriod *time.Duration
// ResourceManagerEndpoint is the Azure Resource Manager endpoint.
// If not specified, the default is the Public cloud resource manager endpoint.
// See https://docs.microsoft.com/cli/azure/manage-clouds-azure-cli#list-available-clouds for details
// about how to find available resource manager endpoints for your cloud. Note that the resource manager
// endpoint is referred to as "resourceManager" in the Azure CLI.
ResourceManagerEndpoint string
// ResourceManagerAudience is the Azure Resource Manager AAD audience.
// If not specified, the default is the Public cloud resource manager audience https://management.core.windows.net/.
// See https://docs.microsoft.com/cli/azure/manage-clouds-azure-cli#list-available-clouds for details
// about how to find available resource manager audiences for your cloud. Note that the resource manager
// audience is referred to as "activeDirectoryResourceId" in the Azure CLI.
ResourceManagerAudience string
// AzureAuthorityHost is the URL of the AAD authority. If not specified, the default
// is the AAD URL for the public cloud: https://login.microsoftonline.com/. See
// https://docs.microsoft.com/azure/active-directory/develop/authentication-national-cloud
AzureAuthorityHost string
// UseWorkloadIdentityAuth boolean is used to determine if we're using Workload Identity authentication for global credential
UseWorkloadIdentityAuth bool
// UserAgentSuffix is appended to the default User-Agent for Azure HTTP clients.
UserAgentSuffix string
// MaxConcurrentReconciles is the number of threads/goroutines dedicated to reconciling each resource type.
// If not specified, the default is 1.
// IMPORTANT: Having MaxConcurrentReconciles set to N does not mean that ASO is limited to N interactions with
// Azure at any given time, because the control loop yields to another resource while it is not actively issuing HTTP
// calls to Azure. Any single resource only blocks the control-loop for its resource-type for as long as it takes to issue
// an HTTP call to Azure, view the result, and make a decision. In most cases the time taken to perform these actions
// (and thus how long the loop is blocked and preventing other resources from being acted upon) is a few hundred
// milliseconds to at most a second or two. In a typical 60s period, many hundreds or even thousands of resources
// can be managed with this set to 1.
// MaxConcurrentReconciles applies to every registered resource type being watched/managed by ASO.
MaxConcurrentReconciles int
RateLimit RateLimit
// DefaultReconcilePolicy allows to override the default reconcile policy that should be used by ASO
// when the annotation serviceoperator.azure.com/reconcile-policy is omitted
DefaultReconcilePolicy annotations.ReconcilePolicyValue
}
type RateLimitMode string
const (
RateLimitModeDisabled = RateLimitMode("disabled")
RateLimitModeBucket = RateLimitMode("bucket")
)
func ParseRateLimitMode(s string) (RateLimitMode, error) {
switch s {
case string(RateLimitModeDisabled):
return RateLimitModeDisabled, nil
case string(RateLimitModeBucket):
return RateLimitModeBucket, nil
default:
return "", eris.Errorf("invalid rate limit mode %q", s)
}
}
type RateLimit struct {
// Mode configures the internal rate-limiting mode.
// Valid values are [disabled, bucket]
// * disabled: No ASO-controlled rate-limiting occurs. ASO will attempt to communicate with Azure and
// kube-apiserver as much as needed based on load. It will back off based on throttling from
// either kube-apiserver or Azure, but will not artificially limit its throughput.
// * bucket: Uses a token-bucket algorithm to rate-limit reconciliations. Note that this limits how often
// the operator performs a reconciliation, but not every reconciliation triggers a call to kube-apiserver
// or Azure (though many do). Since this controls reconciles it can be used to coarsely control throughput
// and CPU usage of the operator, as well as the number of requests that the operator issues to Azure.
// Keep in mind that the Azure throttling limits (defined at
// https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/request-limits-and-throttling)
// differentiate between request types. Since a given reconcile for a resource may result in polling (a GET) or
// modification (a PUT) it's not possible to entirely avoid Azure throttling by tuning these bucket limits.
// We don't recommend enabling this mode by default.
// If enabling this mode, we strongly recommend doing some experimentation to tune these values to something to
// works for your specific need.
Mode RateLimitMode
// QPS is the rate (per second) that the bucket is refilled. This value only has an effect if Mode is 'bucket'.
QPS float64
// BucketSize is the size of the bucket. This value only has an effect if Mode is 'bucket'.
BucketSize int
}
func (r RateLimit) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("Mode:%s", r.Mode))
// Don't log anything other than disabled when mode is disabled
if r.Mode != RateLimitModeDisabled {
builder.WriteString(fmt.Sprintf("/QPS:%f/", r.QPS))
builder.WriteString(fmt.Sprintf("BucketSize:%d", r.BucketSize))
}
return builder.String()
}
var _ fmt.Stringer = Values{}
// Returns the configuration as a string
func (v Values) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("SubscriptionID:%s/", v.SubscriptionID))
builder.WriteString(fmt.Sprintf("TenantID:%s/", v.TenantID))
builder.WriteString(fmt.Sprintf("AdditionalTenants:%s/", strings.Join(v.AdditionalTenants, "|")))
builder.WriteString(fmt.Sprintf("ClientID:%s/", v.ClientID))
builder.WriteString(fmt.Sprintf("PodNamespace:%s/", v.PodNamespace))
builder.WriteString(fmt.Sprintf("OperatorMode:%s/", v.OperatorMode))
builder.WriteString(fmt.Sprintf("TargetNamespaces:%s/", strings.Join(v.TargetNamespaces, "|")))
builder.WriteString(fmt.Sprintf("SyncPeriod:%s/", v.SyncPeriod))
builder.WriteString(fmt.Sprintf("ResourceManagerEndpoint:%s/", v.ResourceManagerEndpoint))
builder.WriteString(fmt.Sprintf("ResourceManagerAudience:%s/", v.ResourceManagerAudience))
builder.WriteString(fmt.Sprintf("AzureAuthorityHost:%s/", v.AzureAuthorityHost))
builder.WriteString(fmt.Sprintf("UseWorkloadIdentityAuth:%t/", v.UseWorkloadIdentityAuth))
builder.WriteString(fmt.Sprintf("UserAgentSuffix:%s/", v.UserAgentSuffix))
builder.WriteString(fmt.Sprintf("MaxConcurrentReconciles:%d/", v.MaxConcurrentReconciles))
builder.WriteString(fmt.Sprintf("RateLimit:[%s]", v.RateLimit.String()))
builder.WriteString(fmt.Sprintf("DefaultReconcilePolicy:[%s]", v.DefaultReconcilePolicy))
return builder.String()
}
// Cloud returns the cloud the configuration is using
func (v Values) Cloud() cloud.Configuration {
cfg := asocloud.Configuration{
AzureAuthorityHost: v.AzureAuthorityHost,
ResourceManagerAudience: v.ResourceManagerAudience,
ResourceManagerEndpoint: v.ResourceManagerEndpoint,
}
return cfg.Cloud()
}
// ReadFromEnvironment loads configuration values from the AZURE_*
// environment variables.
func ReadFromEnvironment() (Values, error) {
var result Values
modeValue := os.Getenv(config.OperatorMode)
if modeValue == "" {
result.OperatorMode = OperatorModeBoth
} else {
mode, err := ParseOperatorMode(modeValue)
if err != nil {
return Values{}, err
}
result.OperatorMode = mode
}
var err error
result.SubscriptionID = os.Getenv(config.AzureSubscriptionID)
result.PodNamespace = os.Getenv(config.PodNamespace)
result.TargetNamespaces = config.ParseCommaCollection(os.Getenv(config.TargetNamespaces))
result.SyncPeriod, err = parseSyncPeriod()
if err != nil {
return result, eris.Wrapf(err, "parsing %q", config.SyncPeriod)
}
result.ResourceManagerEndpoint = envOrDefault(config.ResourceManagerEndpoint, asocloud.DefaultEndpoint)
result.ResourceManagerAudience = envOrDefault(config.ResourceManagerAudience, asocloud.DefaultAudience)
result.AzureAuthorityHost = envOrDefault(config.AzureAuthorityHost, asocloud.DefaultAADAuthorityHost)
result.ClientID = os.Getenv(config.AzureClientID)
result.TenantID = os.Getenv(config.AzureTenantID)
result.AdditionalTenants = config.ParseCommaCollection(os.Getenv(config.AzureAdditionalTenants))
result.MaxConcurrentReconciles, err = envParseOrDefault(config.MaxConcurrentReconciles, DefaultMaxConcurrentReconciles)
if err != nil {
return result, err
}
// Ignoring error here, as any other value or empty value means we should default to false
result.UseWorkloadIdentityAuth, _ = strconv.ParseBool(os.Getenv(config.UseWorkloadIdentityAuth))
result.UserAgentSuffix = os.Getenv(config.UserAgentSuffix)
result.RateLimit.Mode, err = ParseRateLimitMode(envOrDefault(config.RateLimitMode, string(RateLimitModeDisabled)))
if err != nil {
return result, err
}
result.RateLimit.QPS, err = envParseOrDefault(config.RateLimitQPS, 5.0)
if err != nil {
return result, err
}
result.RateLimit.BucketSize, err = envParseOrDefault(config.RateLimitBucketSize, 100)
if err != nil {
return result, err
}
result.DefaultReconcilePolicy = annotations.ReconcilePolicyValue(envOrDefault(config.DefaultReconcilePolicy, string(annotations.ReconcilePolicyManage)))
// Not calling validate here to support using from tests where we
// don't require consistent settings.
return result, nil
}
// ReadAndValidate loads the configuration values and checks that
// they're consistent.
func ReadAndValidate() (Values, error) {
result, err := ReadFromEnvironment()
if err != nil {
return Values{}, err
}
err = result.Validate()
if err != nil {
return Values{}, err
}
return result, nil
}
// Validate checks whether the configuration settings are consistent.
func (v Values) Validate() error {
if v.PodNamespace == "" {
return eris.Errorf("missing value for %s", config.PodNamespace)
}
if !v.OperatorMode.IncludesWatchers() && len(v.TargetNamespaces) > 0 {
return eris.Errorf("%s must include watchers to specify target namespaces", config.TargetNamespaces)
}
if v.MaxConcurrentReconciles <= 0 {
return eris.Errorf("%s must be at least 1", config.MaxConcurrentReconciles)
}
if v.DefaultReconcilePolicy != annotations.ReconcilePolicyDetachOnDelete && v.DefaultReconcilePolicy != annotations.ReconcilePolicyManage && v.DefaultReconcilePolicy != annotations.ReconcilePolicySkip {
return eris.Errorf("%s must be set to any of (%s, %s, %s)", config.DefaultReconcilePolicy, annotations.ReconcilePolicyDetachOnDelete, annotations.ReconcilePolicyManage, annotations.ReconcilePolicySkip)
}
return nil
}
// parseSyncPeriod parses the sync period from the environment
func parseSyncPeriod() (*time.Duration, error) {
syncPeriodStr := envOrDefault(config.SyncPeriod, "1h")
if syncPeriodStr == "never" { // magical string that means no sync
return nil, nil
}
syncPeriod, err := time.ParseDuration(syncPeriodStr)
if err != nil {
return nil, err
}
return &syncPeriod, nil
}
func envParseOrDefault[T int | string | float64](env string, def T) (T, error) {
str, specified := os.LookupEnv(env)
if !specified {
return def, nil
}
if str == "" {
return def, nil
}
var result T
switch any(def).(type) {
case int:
parsedVal, err := strconv.Atoi(str)
if err != nil {
return def, eris.Wrapf(err, "failed to parse value %q for %q", str, env)
}
result = any(parsedVal).(T)
case string:
result = any(str).(T)
case float64:
parsedVal, err := strconv.ParseFloat(str, 64)
if err != nil {
return def, eris.Wrapf(err, "failed to parse value %q for %q", str, env)
}
result = any(parsedVal).(T)
default:
return def, eris.Errorf("can't read unsupported type %T from env", def)
}
return result, nil
}
// envOrDefault returns the value of the specified env variable or the default value if
// the env variable was not set.
func envOrDefault(env string, def string) string {
result, specified := os.LookupEnv(env)
if !specified {
return def
}
if result == "" {
return def
}
return result
}