pilot/pkg/model/push_context.go (1,585 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package model import ( "encoding/json" "fmt" "math" "sort" "strconv" "strings" "sync" "time" ) import ( "go.uber.org/atomic" extensions "istio.io/api/extensions/v1alpha1" meshconfig "istio.io/api/mesh/v1alpha1" networking "istio.io/api/networking/v1alpha3" "istio.io/pkg/monitoring" "k8s.io/apimachinery/pkg/types" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" "github.com/apache/dubbo-go-pixiu/pkg/cluster" "github.com/apache/dubbo-go-pixiu/pkg/config" "github.com/apache/dubbo-go-pixiu/pkg/config/constants" "github.com/apache/dubbo-go-pixiu/pkg/config/host" "github.com/apache/dubbo-go-pixiu/pkg/config/labels" "github.com/apache/dubbo-go-pixiu/pkg/config/protocol" "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" "github.com/apache/dubbo-go-pixiu/pkg/config/visibility" "github.com/apache/dubbo-go-pixiu/pkg/util/sets" ) // Metrics is an interface for capturing metrics on a per-node basis. type Metrics interface { // AddMetric will add an case to the metric for the given node. AddMetric(metric monitoring.Metric, key string, proxyID, msg string) } var _ Metrics = &PushContext{} // serviceIndex is an index of all services by various fields for easy access during push. type serviceIndex struct { // privateByNamespace are services that can reachable within the same namespace, with exportTo "." privateByNamespace map[string][]*Service // public are services reachable within the mesh with exportTo "*" public []*Service // exportedToNamespace are services that were made visible to this namespace // by an exportTo explicitly specifying this namespace. exportedToNamespace map[string][]*Service // HostnameAndNamespace has all services, indexed by hostname then namespace. HostnameAndNamespace map[host.Name]map[string]*Service `json:"-"` // instancesByPort contains a map of service key and instances by port. It is stored here // to avoid recomputations during push. This caches instanceByPort calls with empty labels. // Call InstancesByPort directly when instances need to be filtered by actual labels. instancesByPort map[string]map[int][]*ServiceInstance } func newServiceIndex() serviceIndex { return serviceIndex{ public: []*Service{}, privateByNamespace: map[string][]*Service{}, exportedToNamespace: map[string][]*Service{}, HostnameAndNamespace: map[host.Name]map[string]*Service{}, instancesByPort: map[string]map[int][]*ServiceInstance{}, } } // exportToDefaults contains the default exportTo values. type exportToDefaults struct { service map[visibility.Instance]bool virtualService map[visibility.Instance]bool destinationRule map[visibility.Instance]bool } // virtualServiceIndex is the index of virtual services by various fields. type virtualServiceIndex struct { exportedToNamespaceByGateway map[string]map[string][]config.Config // this contains all the virtual services with exportTo "." and current namespace. The keys are namespace,gateway. privateByNamespaceAndGateway map[string]map[string][]config.Config // This contains all virtual services whose exportTo is "*", keyed by gateway publicByGateway map[string][]config.Config // root vs namespace/name ->delegate vs virtualservice gvk/namespace/name delegates map[ConfigKey][]ConfigKey } func newVirtualServiceIndex() virtualServiceIndex { return virtualServiceIndex{ publicByGateway: map[string][]config.Config{}, privateByNamespaceAndGateway: map[string]map[string][]config.Config{}, exportedToNamespaceByGateway: map[string]map[string][]config.Config{}, delegates: map[ConfigKey][]ConfigKey{}, } } // destinationRuleIndex is the index of destination rules by various fields. type destinationRuleIndex struct { // namespaceLocal contains all public/private dest rules pertaining to a service defined in a given namespace. namespaceLocal map[string]*consolidatedDestRules // exportedByNamespace contains all dest rules pertaining to a service exported by a namespace. exportedByNamespace map[string]*consolidatedDestRules rootNamespaceLocal *consolidatedDestRules // mesh/namespace dest rules to be inherited inheritedByNamespace map[string]*consolidatedDestRule } func newDestinationRuleIndex() destinationRuleIndex { return destinationRuleIndex{ namespaceLocal: map[string]*consolidatedDestRules{}, exportedByNamespace: map[string]*consolidatedDestRules{}, inheritedByNamespace: map[string]*consolidatedDestRule{}, } } // sidecarIndex is the index of sidecar rules type sidecarIndex struct { // sidecars for each namespace sidecarsByNamespace map[string][]*SidecarScope // the Sidecar for the root namespace (if present). This applies to any namespace without its own Sidecar. rootConfig *config.Config // computedSidecarsByNamespace contains the default sidecar for namespaces that do not have a sidecar. // These may be DefaultSidecarScopeForNamespace if rootConfig is empty or ConvertToSidecarScope if not. // These are lazy-loaded. Access protected by defaultSidecarMu computedSidecarsByNamespace map[string]*SidecarScope // gatewayDefaultSidecarsByNamespace contains the default sidecar for namespaces that do not have a sidecar, // for gateways. // Unlike computedSidecarsByNamespace, this is *always* the output of DefaultSidecarScopeForNamespace. // These are lazy-loaded. Access protected by defaultSidecarMu gatewayDefaultSidecarsByNamespace map[string]*SidecarScope defaultSidecarMu *sync.Mutex } func newSidecarIndex() sidecarIndex { return sidecarIndex{ sidecarsByNamespace: map[string][]*SidecarScope{}, computedSidecarsByNamespace: map[string]*SidecarScope{}, gatewayDefaultSidecarsByNamespace: map[string]*SidecarScope{}, defaultSidecarMu: &sync.Mutex{}, } } // gatewayIndex is the index of gateways by various fields. type gatewayIndex struct { // namespace contains gateways by namespace. namespace map[string][]config.Config // all contains all gateways. all []config.Config } func newGatewayIndex() gatewayIndex { return gatewayIndex{ namespace: map[string][]config.Config{}, all: []config.Config{}, } } // serviceMetadataIndex is the index of service metadata by various fields. type serviceMetadataIndex struct { namespace map[string][]*config.Config applicationNameByNamespace map[string]map[string]*config.Config all []*config.Config } func newServiceMetadataIndex() serviceMetadataIndex { return serviceMetadataIndex{ namespace: map[string][]*config.Config{}, applicationNameByNamespace: map[string]map[string]*config.Config{}, all: []*config.Config{}, } } // serviceNameMappingIndex is the index of Service Name Mapping by various fields. type serviceNameMappingIndex struct { // namespace contains Service Name Mapping by namespace. namespace map[string][]*config.Config // interface by namespace interfaceByNamespace map[string]map[string]*config.Config // all contains all Service Name Mapping. all []*config.Config } func newServiceNameMappingIndex() serviceNameMappingIndex { return serviceNameMappingIndex{ namespace: map[string][]*config.Config{}, interfaceByNamespace: map[string]map[string]*config.Config{}, all: []*config.Config{}, } } // PushContext tracks the status of a push - metrics and errors. // Metrics are reset after a push - at the beginning all // values are zero, and when push completes the status is reset. // The struct is exposed in a debug endpoint - fields public to allow // easy serialization as json. type PushContext struct { proxyStatusMutex sync.RWMutex // ProxyStatus is keyed by the error code, and holds a map keyed // by the ID. ProxyStatus map[string]map[string]ProxyPushStatus // Synthesized from env.Mesh exportToDefaults exportToDefaults // ServiceIndex is the index of services by various fields. ServiceIndex serviceIndex // ServiceAccounts contains a map of hostname and port to service accounts. ServiceAccounts map[host.Name]map[int][]string `json:"-"` // virtualServiceIndex is the index of virtual services by various fields. virtualServiceIndex virtualServiceIndex // destinationRuleIndex is the index of destination rules by various fields. destinationRuleIndex destinationRuleIndex // gatewayIndex is the index of gateways. gatewayIndex gatewayIndex // clusterLocalHosts extracted from the MeshConfig clusterLocalHosts ClusterLocalHosts // sidecarIndex stores sidecar resources sidecarIndex sidecarIndex // serviceMetadataIndex stores service metadata resources serviceMetadataIndex serviceMetadataIndex // serviceNameMappingIndex is the index of service name mapping. serviceNameMappingIndex serviceNameMappingIndex // envoy filters for each namespace including global config namespace envoyFiltersByNamespace map[string][]*EnvoyFilterWrapper // wasm plugins for each namespace including global config namespace wasmPluginsByNamespace map[string][]*WasmPluginWrapper // AuthnPolicies contains Authn policies by namespace. AuthnPolicies *AuthenticationPolicies `json:"-"` // AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there // are no authorization policies in the cluster. AuthzPolicies *AuthorizationPolicies `json:"-"` // Telemetry stores the existing Telemetry resources for the cluster. Telemetry *Telemetries `json:"-"` // ProxyConfig stores the existing ProxyConfig resources for the cluster. ProxyConfigs *ProxyConfigs `json:"-"` // The following data is either a global index or used in the inbound path. // Namespace specific views do not apply here. // Mesh configuration for the mesh. Mesh *meshconfig.MeshConfig `json:"-"` // PushVersion describes the push version this push context was computed for PushVersion string // LedgerVersion is the version of the configuration ledger LedgerVersion string // JwtKeyResolver holds a reference to the JWT key resolver instance. JwtKeyResolver *JwksResolver // GatewayAPIController holds a reference to the gateway API controller. GatewayAPIController GatewayController // cache gateways addresses for each network // this is mainly used for kubernetes multi-cluster scenario networkMgr *NetworkManager InitDone atomic.Bool initializeMutex sync.Mutex } type consolidatedDestRules struct { // Map of dest rule host to the list of namespaces to which this destination rule has been exported to exportTo map[host.Name]map[visibility.Instance]bool // Map of dest rule host and the merged destination rules for that host destRules map[host.Name][]*consolidatedDestRule } // consolidatedDestRule represents a dr and from which it is consolidated. type consolidatedDestRule struct { // rule is merged from the following destinationRules. rule *config.Config // the original dest rules from which above rule is merged. from []types.NamespacedName } // XDSUpdater is used for direct updates of the xDS model and incremental push. // Pilot uses multiple registries - for example each K8S cluster is a registry // instance. Each registry is responsible for tracking a set // of endpoints associated with mesh services, and calling the EDSUpdate on changes. // A registry may group endpoints for a service in smaller subsets - for example by // deployment, or to deal with very large number of endpoints for a service. We want // to avoid passing around large objects - like full list of endpoints for a registry, // or the full list of endpoints for a service across registries, since it limits // scalability. // // Future optimizations will include grouping the endpoints by labels, gateway or region to // reduce the time when subsetting or split-horizon is used. This design assumes pilot // tracks all endpoints in the mesh and they fit in RAM - so limit is few M endpoints. // It is possible to split the endpoint tracking in future. type XDSUpdater interface { // EDSUpdate is called when the list of endpoints or labels in a Service is changed. // For each cluster and hostname, the full list of active endpoints (including empty list) // must be sent. The shard name is used as a key - current implementation is using the // registry name. EDSUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint) // EDSCacheUpdate is called when the list of endpoints or labels in a Service is changed. // For each cluster and hostname, the full list of active endpoints (including empty list) // must be sent. The shard name is used as a key - current implementation is using the // registry name. // Note: the difference with `EDSUpdate` is that it only update the cache rather than requesting a push EDSCacheUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint) // SvcUpdate is called when a service definition is updated/deleted. SvcUpdate(shard ShardKey, hostname string, namespace string, event Event) // ConfigUpdate is called to notify the XDS server of config updates and request a push. // The requests may be collapsed and throttled. ConfigUpdate(req *PushRequest) // ProxyUpdate is called to notify the XDS server to send a push to the specified proxy. // The requests may be collapsed and throttled. ProxyUpdate(clusterID cluster.ID, ip string) // RemoveShard removes all endpoints for the given shard key RemoveShard(shardKey ShardKey) } // PushRequest defines a request to push to proxies // It is used to send updates to the config update debouncer and pass to the PushQueue. type PushRequest struct { // Full determines whether a full push is required or not. If false, an incremental update will be sent. // Incremental pushes: // * Do not recompute the push context // * Do not recompute proxy state (such as ServiceInstances) // * Are not reported in standard metrics such as push time // As a result, configuration updates should never be incremental. Generally, only EDS will set this, but // in the future SDS will as well. Full bool // ConfigsUpdated keeps track of configs that have changed. // This is used as an optimization to avoid unnecessary pushes to proxies that are scoped with a Sidecar. // If this is empty, then all proxies will get an update. // Otherwise only proxies depend on these configs will get an update. // The kind of resources are defined in pkg/config/schemas. ConfigsUpdated map[ConfigKey]struct{} // Push stores the push context to use for the update. This may initially be nil, as we will // debounce changes before a PushContext is eventually created. Push *PushContext // Start represents the time a push was started. This represents the time of adding to the PushQueue. // Note that this does not include time spent debouncing. Start time.Time // Reason represents the reason for requesting a push. This should only be a fixed set of values, // to avoid unbounded cardinality in metrics. If this is not set, it may be automatically filled in later. // There should only be multiple reasons if the push request is the result of two distinct triggers, rather than // classifying a single trigger as having multiple reasons. Reason []TriggerReason // Delta defines the resources that were added or removed as part of this push request. // This is set only on requests from the client which change the set of resources they (un)subscribe from. Delta ResourceDelta } // ResourceDelta records the difference in requested resources by an XDS client type ResourceDelta struct { // Subscribed indicates the client requested these additional resources Subscribed sets.Set // Unsubscribed indicates the client no longer requires these resources Unsubscribed sets.Set } func (rd ResourceDelta) IsEmpty() bool { return len(rd.Subscribed) == 0 && len(rd.Unsubscribed) == 0 } type TriggerReason string // If adding a new reason, update xds/monitoring.go:triggerMetric const ( // EndpointUpdate describes a push triggered by an Endpoint change EndpointUpdate TriggerReason = "endpoint" // ConfigUpdate describes a push triggered by a config (generally and Istio CRD) change. ConfigUpdate TriggerReason = "config" // ServiceUpdate describes a push triggered by a Service change ServiceUpdate TriggerReason = "service" // ProxyUpdate describes a push triggered by a change to an individual proxy (such as label change) ProxyUpdate TriggerReason = "proxy" // GlobalUpdate describes a push triggered by a change to global config, such as mesh config GlobalUpdate TriggerReason = "global" // UnknownTrigger describes a push triggered by an unknown reason UnknownTrigger TriggerReason = "unknown" // DebugTrigger describes a push triggered for debugging DebugTrigger TriggerReason = "debug" // SecretTrigger describes a push triggered for a Secret change SecretTrigger TriggerReason = "secret" // NetworksTrigger describes a push triggered for Networks change NetworksTrigger TriggerReason = "networks" // ProxyRequest describes a push triggered based on proxy request ProxyRequest TriggerReason = "proxyrequest" // NamespaceUpdate describes a push triggered by a Namespace change NamespaceUpdate TriggerReason = "namespace" // ClusterUpdate describes a push triggered by a Cluster change ClusterUpdate TriggerReason = "cluster" ) // Merge two update requests together // Merge behaves similarly to a list append; usage should in the form `a = a.merge(b)`. // Importantly, Merge may decide to allocate a new PushRequest object or reuse the existing one - both // inputs should not be used after completion. func (pr *PushRequest) Merge(other *PushRequest) *PushRequest { if pr == nil { return other } if other == nil { return pr } // Keep the first (older) start time // Merge the two reasons. Note that we shouldn't deduplicate here, or we would under count pr.Reason = append(pr.Reason, other.Reason...) // If either is full we need a full push pr.Full = pr.Full || other.Full // The other push context is presumed to be later and more up to date if other.Push != nil { pr.Push = other.Push } // Do not merge when any one is empty if len(pr.ConfigsUpdated) == 0 || len(other.ConfigsUpdated) == 0 { pr.ConfigsUpdated = nil } else { for conf := range other.ConfigsUpdated { pr.ConfigsUpdated[conf] = struct{}{} } } return pr } // CopyMerge two update requests together. Unlike Merge, this will not mutate either input. // This should be used when we are modifying a shared PushRequest (typically any time it's in the context // of a single proxy) func (pr *PushRequest) CopyMerge(other *PushRequest) *PushRequest { if pr == nil { return other } if other == nil { return pr } var reason []TriggerReason if len(pr.Reason)+len(other.Reason) > 0 { reason = make([]TriggerReason, 0, len(pr.Reason)+len(other.Reason)) reason = append(reason, pr.Reason...) reason = append(reason, other.Reason...) } merged := &PushRequest{ // Keep the first (older) start time Start: pr.Start, // If either is full we need a full push Full: pr.Full || other.Full, // The other push context is presumed to be later and more up to date Push: other.Push, // Merge the two reasons. Note that we shouldn't deduplicate here, or we would under count Reason: reason, } // Do not merge when any one is empty if len(pr.ConfigsUpdated) > 0 && len(other.ConfigsUpdated) > 0 { merged.ConfigsUpdated = make(map[ConfigKey]struct{}, len(pr.ConfigsUpdated)+len(other.ConfigsUpdated)) for conf := range pr.ConfigsUpdated { merged.ConfigsUpdated[conf] = struct{}{} } for conf := range other.ConfigsUpdated { merged.ConfigsUpdated[conf] = struct{}{} } } return merged } func (pr *PushRequest) PushReason() string { if len(pr.Reason) == 1 && pr.Reason[0] == ProxyRequest { return " request" } return "" } // ProxyPushStatus represents an event captured during config push to proxies. // It may contain additional message and the affected proxy. type ProxyPushStatus struct { Proxy string `json:"proxy,omitempty"` Message string `json:"message,omitempty"` } // AddMetric will add an case to the metric. func (ps *PushContext) AddMetric(metric monitoring.Metric, key string, proxyID, msg string) { if ps == nil { log.Infof("Metric without context %s %v %s", key, proxyID, msg) return } ps.proxyStatusMutex.Lock() defer ps.proxyStatusMutex.Unlock() metricMap, f := ps.ProxyStatus[metric.Name()] if !f { metricMap = map[string]ProxyPushStatus{} ps.ProxyStatus[metric.Name()] = metricMap } ev := ProxyPushStatus{Message: msg, Proxy: proxyID} metricMap[key] = ev } var ( // EndpointNoPod tracks endpoints without an associated pod. This is an error condition, since // we can't figure out the labels. It may be a transient problem, if endpoint is processed before // pod. EndpointNoPod = monitoring.NewGauge( "endpoint_no_pod", "Endpoints without an associated pod.", ) // ProxyStatusNoService represents proxies not selected by any service // This can be normal - for workloads that act only as client, or are not covered by a Service. // It can also be an error, for example in cases the Endpoint list of a service was not updated by the time // the sidecar calls. // Updated by GetProxyServiceInstances ProxyStatusNoService = monitoring.NewGauge( "pilot_no_ip", "Pods not found in the endpoint table, possibly invalid.", ) // ProxyStatusEndpointNotReady represents proxies found not be ready. // Updated by GetProxyServiceInstances. Normal condition when starting // an app with readiness, error if it doesn't change to 0. ProxyStatusEndpointNotReady = monitoring.NewGauge( "pilot_endpoint_not_ready", "Endpoint found in unready state.", ) // ProxyStatusConflictOutboundListenerTCPOverHTTP metric tracks number of // wildcard TCP listeners that conflicted with existing wildcard HTTP listener on same port ProxyStatusConflictOutboundListenerTCPOverHTTP = monitoring.NewGauge( "pilot_conflict_outbound_listener_tcp_over_current_http", "Number of conflicting wildcard tcp listeners with current wildcard http listener.", ) // ProxyStatusConflictOutboundListenerTCPOverTCP metric tracks number of // TCP listeners that conflicted with existing TCP listeners on same port ProxyStatusConflictOutboundListenerTCPOverTCP = monitoring.NewGauge( "pilot_conflict_outbound_listener_tcp_over_current_tcp", "Number of conflicting tcp listeners with current tcp listener.", ) // ProxyStatusConflictOutboundListenerHTTPOverTCP metric tracks number of // wildcard HTTP listeners that conflicted with existing wildcard TCP listener on same port ProxyStatusConflictOutboundListenerHTTPOverTCP = monitoring.NewGauge( "pilot_conflict_outbound_listener_http_over_current_tcp", "Number of conflicting wildcard http listeners with current wildcard tcp listener.", ) // ProxyStatusConflictInboundListener tracks cases of multiple inbound // listeners - 2 services selecting the same port of the pod. ProxyStatusConflictInboundListener = monitoring.NewGauge( "pilot_conflict_inbound_listener", "Number of conflicting inbound listeners.", ) // DuplicatedClusters tracks duplicate clusters seen while computing CDS DuplicatedClusters = monitoring.NewGauge( "pilot_duplicate_envoy_clusters", "Duplicate envoy clusters caused by service entries with same hostname", ) // DNSNoEndpointClusters tracks dns clusters without endpoints DNSNoEndpointClusters = monitoring.NewGauge( "pilot_dns_cluster_without_endpoints", "DNS clusters without endpoints caused by the endpoint field in "+ "STRICT_DNS type cluster is not set or the corresponding subset cannot select any endpoint", ) // ProxyStatusClusterNoInstances tracks clusters (services) without workloads. ProxyStatusClusterNoInstances = monitoring.NewGauge( "pilot_eds_no_instances", "Number of clusters without instances.", ) // DuplicatedDomains tracks rejected VirtualServices due to duplicated hostname. DuplicatedDomains = monitoring.NewGauge( "pilot_vservice_dup_domain", "Virtual services with dup domains.", ) // DuplicatedSubsets tracks duplicate subsets that we rejected while merging multiple destination rules for same host DuplicatedSubsets = monitoring.NewGauge( "pilot_destrule_subsets", "Duplicate subsets across destination rules for same host", ) // totalVirtualServices tracks the total number of virtual service totalVirtualServices = monitoring.NewGauge( "pilot_virt_services", "Total virtual services known to pilot.", ) // LastPushStatus preserves the metrics and data collected during lasts global push. // It can be used by debugging tools to inspect the push event. It will be reset after each push with the // new version. LastPushStatus *PushContext // LastPushMutex will protect the LastPushStatus LastPushMutex sync.Mutex // All metrics we registered. metrics = []monitoring.Metric{ EndpointNoPod, ProxyStatusNoService, ProxyStatusEndpointNotReady, ProxyStatusConflictOutboundListenerTCPOverHTTP, ProxyStatusConflictOutboundListenerTCPOverTCP, ProxyStatusConflictOutboundListenerHTTPOverTCP, ProxyStatusConflictInboundListener, DuplicatedClusters, ProxyStatusClusterNoInstances, DuplicatedDomains, DuplicatedSubsets, } ) func init() { for _, m := range metrics { monitoring.MustRegister(m) } monitoring.MustRegister(totalVirtualServices) } // NewPushContext creates a new PushContext structure to track push status. func NewPushContext() *PushContext { return &PushContext{ ServiceIndex: newServiceIndex(), virtualServiceIndex: newVirtualServiceIndex(), destinationRuleIndex: newDestinationRuleIndex(), sidecarIndex: newSidecarIndex(), envoyFiltersByNamespace: map[string][]*EnvoyFilterWrapper{}, gatewayIndex: newGatewayIndex(), ProxyStatus: map[string]map[string]ProxyPushStatus{}, ServiceAccounts: map[host.Name]map[int][]string{}, serviceMetadataIndex: newServiceMetadataIndex(), serviceNameMappingIndex: newServiceNameMappingIndex(), } } // AddPublicServices adds the services to context public services - mainly used in tests. func (ps *PushContext) AddPublicServices(services []*Service) { ps.ServiceIndex.public = append(ps.ServiceIndex.public, services...) } // AddServiceInstances adds instances to the context service instances - mainly used in tests. func (ps *PushContext) AddServiceInstances(service *Service, instances map[int][]*ServiceInstance) { svcKey := service.Key() for port, inst := range instances { if _, exists := ps.ServiceIndex.instancesByPort[svcKey]; !exists { ps.ServiceIndex.instancesByPort[svcKey] = make(map[int][]*ServiceInstance) } ps.ServiceIndex.instancesByPort[svcKey][port] = append(ps.ServiceIndex.instancesByPort[svcKey][port], inst...) } } // StatusJSON implements json.Marshaller, with a lock. func (ps *PushContext) StatusJSON() ([]byte, error) { if ps == nil { return []byte{'{', '}'}, nil } ps.proxyStatusMutex.RLock() defer ps.proxyStatusMutex.RUnlock() return json.MarshalIndent(ps.ProxyStatus, "", " ") } // OnConfigChange is called when a config change is detected. func (ps *PushContext) OnConfigChange() { LastPushMutex.Lock() LastPushStatus = ps LastPushMutex.Unlock() ps.UpdateMetrics() } // UpdateMetrics will update the prometheus metrics based on the // current status of the push. func (ps *PushContext) UpdateMetrics() { ps.proxyStatusMutex.RLock() defer ps.proxyStatusMutex.RUnlock() for _, pm := range metrics { mmap := ps.ProxyStatus[pm.Name()] pm.Record(float64(len(mmap))) } } // It is called after virtual service short host name is resolved to FQDN func virtualServiceDestinationHosts(v *networking.VirtualService) []string { if v == nil { return nil } var out []string for _, h := range v.Http { for _, r := range h.Route { if r.Destination != nil { out = append(out, r.Destination.Host) } } if h.Mirror != nil { out = append(out, h.Mirror.Host) } } for _, t := range v.Tcp { for _, r := range t.Route { if r.Destination != nil { out = append(out, r.Destination.Host) } } } for _, t := range v.Tls { for _, r := range t.Route { if r.Destination != nil { out = append(out, r.Destination.Host) } } } return out } // GatewayServices returns the set of services which are referred from the proxy gateways. func (ps *PushContext) GatewayServices(proxy *Proxy) []*Service { svcs := proxy.SidecarScope.services // MergedGateway will be nil when there are no configs in the // system during initial installation. if proxy.MergedGateway == nil { return nil } // host set. hostsFromGateways := sets.New() for _, gw := range proxy.MergedGateway.GatewayNameForServer { for _, vsConfig := range ps.VirtualServicesForGateway(proxy.ConfigNamespace, gw) { vs, ok := vsConfig.Spec.(*networking.VirtualService) if !ok { // should never happen log.Errorf("Failed in getting a virtual service: %v", vsConfig.Labels) return svcs } for _, host := range virtualServiceDestinationHosts(vs) { hostsFromGateways.Insert(host) } } } hostsFromMeshConfig := getHostsFromMeshConfig(ps) hostsFromGateways.Merge(hostsFromMeshConfig) log.Debugf("GatewayServices: gateway %v is exposing these hosts:%v", proxy.ID, hostsFromGateways) gwSvcs := make([]*Service, 0, len(svcs)) for _, s := range svcs { svcHost := string(s.Hostname) if _, ok := hostsFromGateways[svcHost]; ok { gwSvcs = append(gwSvcs, s) } } log.Debugf("GatewayServices:: gateways len(services)=%d, len(filtered)=%d", len(svcs), len(gwSvcs)) return gwSvcs } // add services from MeshConfig.ExtensionProviders // TODO: include cluster from EnvoyFilter such as global ratelimit [demo](https://istio.io/latest/docs/tasks/policy-enforcement/rate-limit/#global-rate-limit) func getHostsFromMeshConfig(ps *PushContext) sets.Set { hostsFromMeshConfig := sets.New() for _, prov := range ps.Mesh.ExtensionProviders { switch p := prov.Provider.(type) { case *meshconfig.MeshConfig_ExtensionProvider_EnvoyExtAuthzHttp: hostsFromMeshConfig.Insert(p.EnvoyExtAuthzHttp.Service) case *meshconfig.MeshConfig_ExtensionProvider_EnvoyExtAuthzGrpc: hostsFromMeshConfig.Insert(p.EnvoyExtAuthzGrpc.Service) case *meshconfig.MeshConfig_ExtensionProvider_Zipkin: hostsFromMeshConfig.Insert(p.Zipkin.Service) case *meshconfig.MeshConfig_ExtensionProvider_Lightstep: hostsFromMeshConfig.Insert(p.Lightstep.Service) case *meshconfig.MeshConfig_ExtensionProvider_Datadog: hostsFromMeshConfig.Insert(p.Datadog.Service) case *meshconfig.MeshConfig_ExtensionProvider_Opencensus: hostsFromMeshConfig.Insert(p.Opencensus.Service) case *meshconfig.MeshConfig_ExtensionProvider_Skywalking: hostsFromMeshConfig.Insert(p.Skywalking.Service) case *meshconfig.MeshConfig_ExtensionProvider_EnvoyHttpAls: hostsFromMeshConfig.Insert(p.EnvoyHttpAls.Service) case *meshconfig.MeshConfig_ExtensionProvider_EnvoyTcpAls: hostsFromMeshConfig.Insert(p.EnvoyTcpAls.Service) case *meshconfig.MeshConfig_ExtensionProvider_EnvoyOtelAls: hostsFromMeshConfig.Insert(p.EnvoyOtelAls.Service) } } return hostsFromMeshConfig } // servicesExportedToNamespace returns the list of services that are visible to a namespace. // namespace "" indicates all namespaces func (ps *PushContext) servicesExportedToNamespace(ns string) []*Service { out := make([]*Service, 0) // First add private services and explicitly exportedTo services if ns == NamespaceAll { for _, privateServices := range ps.ServiceIndex.privateByNamespace { out = append(out, privateServices...) } } else { out = append(out, ps.ServiceIndex.privateByNamespace[ns]...) out = append(out, ps.ServiceIndex.exportedToNamespace[ns]...) } // Second add public services out = append(out, ps.ServiceIndex.public...) return out } // GetAllServices returns the total services within the mesh. // Note: per proxy services should use SidecarScope.Services. func (ps *PushContext) GetAllServices() []*Service { return ps.servicesExportedToNamespace(NamespaceAll) } // ServiceForHostname returns the service associated with a given hostname following SidecarScope func (ps *PushContext) ServiceForHostname(proxy *Proxy, hostname host.Name) *Service { if proxy != nil && proxy.SidecarScope != nil { return proxy.SidecarScope.servicesByHostname[hostname] } // SidecarScope shouldn't be null here. If it is, we can't disambiguate the hostname to use for a namespace, // so the selection must be undefined. for _, service := range ps.ServiceIndex.HostnameAndNamespace[hostname] { return service } // No service found return nil } // IsServiceVisible returns true if the input service is visible to the given namespace. func (ps *PushContext) IsServiceVisible(service *Service, namespace string) bool { if service == nil { return false } ns := service.Attributes.Namespace if len(service.Attributes.ExportTo) == 0 { if ps.exportToDefaults.service[visibility.Private] { return ns == namespace } else if ps.exportToDefaults.service[visibility.Public] { return true } } return service.Attributes.ExportTo[visibility.Public] || (service.Attributes.ExportTo[visibility.Private] && ns == namespace) || service.Attributes.ExportTo[visibility.Instance(namespace)] } // VirtualServicesForGateway lists all virtual services bound to the specified gateways // This replaces store.VirtualServices. Used only by the gateways // Sidecars use the egressListener.VirtualServices(). func (ps *PushContext) VirtualServicesForGateway(proxyNamespace, gateway string) []config.Config { res := make([]config.Config, 0, len(ps.virtualServiceIndex.privateByNamespaceAndGateway[proxyNamespace][gateway])+ len(ps.virtualServiceIndex.exportedToNamespaceByGateway[proxyNamespace][gateway])+ len(ps.virtualServiceIndex.publicByGateway[gateway])) res = append(res, ps.virtualServiceIndex.privateByNamespaceAndGateway[proxyNamespace][gateway]...) res = append(res, ps.virtualServiceIndex.exportedToNamespaceByGateway[proxyNamespace][gateway]...) res = append(res, ps.virtualServiceIndex.publicByGateway[gateway]...) return res } func (ps *PushContext) ServiceNameMappingsByNameSpaceAndInterfaceName(proxyNamespace, interfaceName string) *config.Config { if namespace, exists := ps.serviceNameMappingIndex.interfaceByNamespace[proxyNamespace]; exists { if snp, exists := namespace[interfaceName]; exists { return snp } } return nil } // DelegateVirtualServicesConfigKey lists all the delegate virtual services configkeys associated with the provided virtual services func (ps *PushContext) DelegateVirtualServicesConfigKey(vses []config.Config) []ConfigKey { var out []ConfigKey for _, vs := range vses { out = append(out, ps.virtualServiceIndex.delegates[ConfigKey{Kind: gvk.VirtualService, Namespace: vs.Namespace, Name: vs.Name}]...) } return out } // getSidecarScope returns a SidecarScope object associated with the // proxy. The SidecarScope object is a semi-processed view of the service // registry, and config state associated with the sidecar crd. The scope contains // a set of inbound and outbound listeners, services/configs per listener, // etc. The sidecar scopes are precomputed in the initSidecarContext // function based on the Sidecar API objects in each namespace. If there is // no sidecar api object, a default sidecarscope is assigned to the // namespace which enables connectivity to all services in the mesh. // // Callers can check if the sidecarScope is from user generated object or not // by checking the sidecarScope.Config field, that contains the user provided config func (ps *PushContext) getSidecarScope(proxy *Proxy, workloadLabels labels.Instance) *SidecarScope { // Find the most specific matching sidecar config from the proxy's // config namespace If none found, construct a sidecarConfig on the fly // that allows the sidecar to talk to any namespace (the default // behavior in the absence of sidecars). if sidecars, ok := ps.sidecarIndex.sidecarsByNamespace[proxy.ConfigNamespace]; ok { // TODO: logic to merge multiple sidecar resources // Currently we assume that there will be only one sidecar config for a namespace. if proxy.Type == Router { for _, wrapper := range sidecars { // Gateways should just have a default scope with egress: */* if wrapper.Sidecar == nil { return wrapper } } } if proxy.Type == SidecarProxy { for _, wrapper := range sidecars { if wrapper.Sidecar != nil { sidecar := wrapper.Sidecar // if there is no workload selector, the config applies to all workloads // if there is a workload selector, check for matching workload labels if sidecar.GetWorkloadSelector() != nil { workloadSelector := labels.Instance(sidecar.GetWorkloadSelector().GetLabels()) // exclude workload selector that not match if !workloadSelector.SubsetOf(workloadLabels) { continue } } // it is guaranteed sidecars with selectors are put in front // and the sidecars are sorted by creation timestamp, // return exact/wildcard matching one directly return wrapper } // this happens at last, it is the default sidecar scope return wrapper } } } // We didn't have a Sidecar in the namespace. This means we should use the default - either an implicit // default selecting everything, or pulling from the root namespace. ps.sidecarIndex.defaultSidecarMu.Lock() defer ps.sidecarIndex.defaultSidecarMu.Unlock() if proxy.Type == Router { sc, f := ps.sidecarIndex.gatewayDefaultSidecarsByNamespace[proxy.ConfigNamespace] if f { // We have already computed the scope for this namespace, just fetch it return sc } computed := DefaultSidecarScopeForNamespace(ps, proxy.ConfigNamespace) ps.sidecarIndex.gatewayDefaultSidecarsByNamespace[proxy.ConfigNamespace] = computed return computed } sc, f := ps.sidecarIndex.computedSidecarsByNamespace[proxy.ConfigNamespace] if f { // We have already computed the scope for this namespace, just fetch it return sc } // We need to compute this namespace var computed *SidecarScope if ps.sidecarIndex.rootConfig != nil { computed = ConvertToSidecarScope(ps, ps.sidecarIndex.rootConfig, proxy.ConfigNamespace) } else { computed = DefaultSidecarScopeForNamespace(ps, proxy.ConfigNamespace) // Even though we are a sidecar, we can store this as a gateway one since it could be used by a gateway ps.sidecarIndex.gatewayDefaultSidecarsByNamespace[proxy.ConfigNamespace] = computed } ps.sidecarIndex.computedSidecarsByNamespace[proxy.ConfigNamespace] = computed return computed } // destinationRule returns a destination rule for a service name in a given namespace. func (ps *PushContext) destinationRule(proxyNameSpace string, service *Service) []*consolidatedDestRule { if service == nil { return nil } // If the proxy config namespace is same as the root config namespace // look for dest rules in the service's namespace first. This hack is needed // because sometimes, dubbo-system tends to become the root config namespace. // Destination rules are defined here for global purposes. We do not want these // catch all destination rules to be the only dest rule, when processing CDS for // proxies like the istio-ingressgateway or istio-egressgateway. // If there are no service specific dest rules, we will end up picking up the same // rules anyway, later in the code // 1. select destination rule from proxy config namespace if proxyNameSpace != ps.Mesh.RootNamespace { // search through the DestinationRules in proxy's namespace first if ps.destinationRuleIndex.namespaceLocal[proxyNameSpace] != nil { if hostname, ok := MostSpecificHostMatch(service.Hostname, ps.destinationRuleIndex.namespaceLocal[proxyNameSpace].destRules, ); ok { return ps.destinationRuleIndex.namespaceLocal[proxyNameSpace].destRules[hostname] } } } else { // If this is a namespace local DR in the same namespace, this must be meant for this proxy, so we do not // need to worry about overriding other DRs with *.local type rules here. If we ignore this, then exportTo=. in // root namespace would always be ignored if hostname, ok := MostSpecificHostMatch(service.Hostname, ps.destinationRuleIndex.rootNamespaceLocal.destRules, ); ok { return ps.destinationRuleIndex.rootNamespaceLocal.destRules[hostname] } } // 2. select destination rule from service namespace svcNs := service.Attributes.Namespace // This can happen when finding the subset labels for a proxy in root namespace. // Because based on a pure cluster's fqdn, we do not know the service and // construct a fake service without setting Attributes at all. if svcNs == "" { for _, svc := range ps.servicesExportedToNamespace(proxyNameSpace) { if service.Hostname == svc.Hostname && svc.Attributes.Namespace != "" { svcNs = svc.Attributes.Namespace break } } } // 3. if no private/public rule matched in the calling proxy's namespace, // check the target service's namespace for exported rules if svcNs != "" { if out := ps.getExportedDestinationRuleFromNamespace(svcNs, service.Hostname, proxyNameSpace); out != nil { return out } } // 4. if no public/private rule in calling proxy's namespace matched, and no public rule in the // target service's namespace matched, search for any exported destination rule in the config root namespace if out := ps.getExportedDestinationRuleFromNamespace(ps.Mesh.RootNamespace, service.Hostname, proxyNameSpace); out != nil { return out } // 5. service DestinationRules were merged in SetDestinationRules, return mesh/namespace rules if present if features.EnableDestinationRuleInheritance { // return namespace rule if present if out := ps.destinationRuleIndex.inheritedByNamespace[proxyNameSpace]; out != nil { return []*consolidatedDestRule{out} } // return mesh rule if out := ps.destinationRuleIndex.inheritedByNamespace[ps.Mesh.RootNamespace]; out != nil { return []*consolidatedDestRule{out} } } return nil } func (ps *PushContext) getExportedDestinationRuleFromNamespace(owningNamespace string, hostname host.Name, clientNamespace string) []*consolidatedDestRule { if ps.destinationRuleIndex.exportedByNamespace[owningNamespace] != nil { if specificHostname, ok := MostSpecificHostMatch(hostname, ps.destinationRuleIndex.exportedByNamespace[owningNamespace].destRules, ); ok { // Check if the dest rule for this host is actually exported to the proxy's (client) namespace exportToMap := ps.destinationRuleIndex.exportedByNamespace[owningNamespace].exportTo[specificHostname] if len(exportToMap) == 0 || exportToMap[visibility.Public] || exportToMap[visibility.Instance(clientNamespace)] { if features.EnableDestinationRuleInheritance { var parent *consolidatedDestRule // client inherits global DR from its own namespace, not from the exported DR's owning namespace // grab the client namespace DR or mesh if none exists if parent = ps.destinationRuleIndex.inheritedByNamespace[clientNamespace]; parent == nil { parent = ps.destinationRuleIndex.inheritedByNamespace[ps.Mesh.RootNamespace] } var inheritedDrList []*consolidatedDestRule for _, child := range ps.destinationRuleIndex.exportedByNamespace[owningNamespace].destRules[specificHostname] { inheritedDr := ps.inheritDestinationRule(parent, child) if inheritedDr != nil { inheritedDrList = append(inheritedDrList, inheritedDr) } } return inheritedDrList } if dr, ok := ps.destinationRuleIndex.exportedByNamespace[owningNamespace].destRules[specificHostname]; ok { return dr } } } } return nil } func (ps *PushContext) ServiceMetadata(namespace, applicationName, revision string) *config.Config { if conf, ok := ps.serviceMetadataIndex.applicationNameByNamespace[namespace][strings.ToLower(fmt.Sprintf("%s-%s", applicationName, revision))]; ok { return conf } return nil } // IsClusterLocal indicates whether the endpoints for the service should only be accessible to clients // within the cluster. func (ps *PushContext) IsClusterLocal(service *Service) bool { if service == nil { return false } return ps.clusterLocalHosts.IsClusterLocal(service.Hostname) } // InitContext will initialize the data structures used for code generation. // This should be called before starting the push, from the thread creating // the push context. func (ps *PushContext) InitContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) error { // Acquire a lock to ensure we don't concurrently initialize the same PushContext. // If this does happen, one thread will block then exit early from InitDone=true ps.initializeMutex.Lock() defer ps.initializeMutex.Unlock() if ps.InitDone.Load() { return nil } ps.Mesh = env.Mesh() ps.LedgerVersion = env.Version() // Must be initialized first // as initServiceRegistry/VirtualServices/Destrules // use the default export map ps.initDefaultExportMaps() // create new or incremental update if pushReq == nil || oldPushContext == nil || !oldPushContext.InitDone.Load() || len(pushReq.ConfigsUpdated) == 0 { if err := ps.createNewContext(env); err != nil { return err } } else { if err := ps.updateContext(env, oldPushContext, pushReq); err != nil { return err } } ps.networkMgr = env.NetworkManager ps.clusterLocalHosts = env.ClusterLocal().GetClusterLocalHosts() ps.InitDone.Store(true) return nil } func (ps *PushContext) createNewContext(env *Environment) error { if err := ps.initServiceRegistry(env); err != nil { return err } if err := ps.initKubernetesGateways(env); err != nil { return err } if err := ps.initVirtualServices(env); err != nil { return err } if err := ps.initDestinationRules(env); err != nil { return err } if err := ps.initAuthnPolicies(env); err != nil { return err } if err := ps.initAuthorizationPolicies(env); err != nil { authzLog.Errorf("failed to initialize authorization policies: %v", err) return err } if err := ps.initTelemetry(env); err != nil { return err } if err := ps.initProxyConfigs(env); err != nil { return err } if err := ps.initWasmPlugins(env); err != nil { return err } if err := ps.initEnvoyFilters(env); err != nil { return err } if err := ps.initGateways(env); err != nil { return err } if err := ps.initServiceMetadata(env); err != nil { return err } // Must be initialized in the end if err := ps.initSidecarScopes(env); err != nil { return err } // service name mapping context init if err := ps.initServiceNameMappings(env); err != nil { return err } return nil } func (ps *PushContext) updateContext( env *Environment, oldPushContext *PushContext, pushReq *PushRequest) error { var servicesChanged, virtualServicesChanged, destinationRulesChanged, gatewayChanged, authnChanged, authzChanged, envoyFiltersChanged, sidecarsChanged, telemetryChanged, gatewayAPIChanged, wasmPluginsChanged, proxyConfigsChanged, servicenamemappingsChanged bool for conf := range pushReq.ConfigsUpdated { switch conf.Kind { case gvk.ServiceEntry: servicesChanged = true case gvk.DestinationRule: destinationRulesChanged = true case gvk.VirtualService: virtualServicesChanged = true case gvk.Gateway: gatewayChanged = true case gvk.Sidecar: sidecarsChanged = true case gvk.WasmPlugin: wasmPluginsChanged = true case gvk.EnvoyFilter: envoyFiltersChanged = true case gvk.AuthorizationPolicy: authzChanged = true case gvk.RequestAuthentication, gvk.PeerAuthentication: authnChanged = true case gvk.HTTPRoute, gvk.TCPRoute, gvk.GatewayClass, gvk.KubernetesGateway, gvk.TLSRoute, gvk.ReferencePolicy: gatewayAPIChanged = true // VS and GW are derived from gatewayAPI, so if it changed we need to update those as well virtualServicesChanged = true gatewayChanged = true case gvk.Telemetry: telemetryChanged = true case gvk.ProxyConfig: proxyConfigsChanged = true case gvk.ServiceNameMapping: servicenamemappingsChanged = true } } if servicesChanged { // Services have changed. initialize service registry if err := ps.initServiceRegistry(env); err != nil { return err } } else { // make sure we copy over things that would be generated in initServiceRegistry ps.ServiceIndex = oldPushContext.ServiceIndex ps.ServiceAccounts = oldPushContext.ServiceAccounts } if servicesChanged || gatewayAPIChanged { // Gateway status depends on services, so recompute if they change as well if err := ps.initKubernetesGateways(env); err != nil { return err } } if virtualServicesChanged { if err := ps.initVirtualServices(env); err != nil { return err } } else { ps.virtualServiceIndex = oldPushContext.virtualServiceIndex } if destinationRulesChanged { if err := ps.initDestinationRules(env); err != nil { return err } } else { ps.destinationRuleIndex = oldPushContext.destinationRuleIndex } if authnChanged { if err := ps.initAuthnPolicies(env); err != nil { return err } } else { ps.AuthnPolicies = oldPushContext.AuthnPolicies } if authzChanged { if err := ps.initAuthorizationPolicies(env); err != nil { authzLog.Errorf("failed to initialize authorization policies: %v", err) return err } } else { ps.AuthzPolicies = oldPushContext.AuthzPolicies } if telemetryChanged { if err := ps.initTelemetry(env); err != nil { return err } } else { ps.Telemetry = oldPushContext.Telemetry } if proxyConfigsChanged { if err := ps.initProxyConfigs(env); err != nil { return err } } else { ps.ProxyConfigs = oldPushContext.ProxyConfigs } if wasmPluginsChanged { if err := ps.initWasmPlugins(env); err != nil { return err } } else { ps.wasmPluginsByNamespace = oldPushContext.wasmPluginsByNamespace } if envoyFiltersChanged { if err := ps.initEnvoyFilters(env); err != nil { return err } } else { ps.envoyFiltersByNamespace = oldPushContext.envoyFiltersByNamespace } if gatewayChanged { if err := ps.initGateways(env); err != nil { return err } } else { ps.gatewayIndex = oldPushContext.gatewayIndex } // Must be initialized in the end // Sidecars need to be updated if services, virtual services, destination rules, or the sidecar configs change if servicesChanged || virtualServicesChanged || destinationRulesChanged || sidecarsChanged { if err := ps.initSidecarScopes(env); err != nil { return err } } else { ps.sidecarIndex.sidecarsByNamespace = oldPushContext.sidecarIndex.sidecarsByNamespace } if servicenamemappingsChanged { if err := ps.initServiceNameMappings(env); err != nil { return err } } else { ps.serviceNameMappingIndex = oldPushContext.serviceNameMappingIndex } return nil } // Caches list of services in the registry, and creates a map // of hostname to service func (ps *PushContext) initServiceRegistry(env *Environment) error { // Sort the services in order of creation. allServices := SortServicesByCreationTime(env.Services()) for _, s := range allServices { svcKey := s.Key() // Precache instances for _, port := range s.Ports { if _, ok := ps.ServiceIndex.instancesByPort[svcKey]; !ok { ps.ServiceIndex.instancesByPort[svcKey] = make(map[int][]*ServiceInstance) } instances := make([]*ServiceInstance, 0) instances = append(instances, env.InstancesByPort(s, port.Port, nil)...) ps.ServiceIndex.instancesByPort[svcKey][port.Port] = instances } if _, f := ps.ServiceIndex.HostnameAndNamespace[s.Hostname]; !f { ps.ServiceIndex.HostnameAndNamespace[s.Hostname] = map[string]*Service{} } ps.ServiceIndex.HostnameAndNamespace[s.Hostname][s.Attributes.Namespace] = s ns := s.Attributes.Namespace if len(s.Attributes.ExportTo) == 0 { if ps.exportToDefaults.service[visibility.Private] { ps.ServiceIndex.privateByNamespace[ns] = append(ps.ServiceIndex.privateByNamespace[ns], s) } else if ps.exportToDefaults.service[visibility.Public] { ps.ServiceIndex.public = append(ps.ServiceIndex.public, s) } } else { // if service has exportTo ~ - i.e. not visible to anyone, ignore all exportTos // if service has exportTo *, make public and ignore all other exportTos // if service has exportTo ., replace with current namespace if s.Attributes.ExportTo[visibility.Public] { ps.ServiceIndex.public = append(ps.ServiceIndex.public, s) continue } else if s.Attributes.ExportTo[visibility.None] { continue } else { // . or other namespaces for exportTo := range s.Attributes.ExportTo { if exportTo == visibility.Private || string(exportTo) == ns { // exportTo with same namespace is effectively private ps.ServiceIndex.privateByNamespace[ns] = append(ps.ServiceIndex.privateByNamespace[ns], s) } else { // exportTo is a specific target namespace ps.ServiceIndex.exportedToNamespace[string(exportTo)] = append(ps.ServiceIndex.exportedToNamespace[string(exportTo)], s) } } } } } ps.initServiceAccounts(env, allServices) return nil } // SortServicesByCreationTime sorts the list of services in ascending order by their creation time (if available). func SortServicesByCreationTime(services []*Service) []*Service { sort.SliceStable(services, func(i, j int) bool { // If creation time is the same, then behavior is nondeterministic. In this case, we can // pick an arbitrary but consistent ordering based on name and namespace, which is unique. // CreationTimestamp is stored in seconds, so this is not uncommon. if services[i].CreationTime.Equal(services[j].CreationTime) { in := services[i].Attributes.Name + "." + services[i].Attributes.Namespace jn := services[j].Attributes.Name + "." + services[j].Attributes.Namespace return in < jn } return services[i].CreationTime.Before(services[j].CreationTime) }) return services } // Caches list of service accounts in the registry func (ps *PushContext) initServiceAccounts(env *Environment, services []*Service) { for _, svc := range services { if ps.ServiceAccounts[svc.Hostname] == nil { ps.ServiceAccounts[svc.Hostname] = map[int][]string{} } for _, port := range svc.Ports { if port.Protocol == protocol.UDP { continue } ps.ServiceAccounts[svc.Hostname][port.Port] = env.GetIstioServiceAccounts(svc, []int{port.Port}) } } } // Caches list of authentication policies func (ps *PushContext) initAuthnPolicies(env *Environment) error { // Init beta policy. var err error ps.AuthnPolicies, err = initAuthenticationPolicies(env) return err } // Caches list of virtual services func (ps *PushContext) initVirtualServices(env *Environment) error { ps.virtualServiceIndex.exportedToNamespaceByGateway = map[string]map[string][]config.Config{} ps.virtualServiceIndex.privateByNamespaceAndGateway = map[string]map[string][]config.Config{} ps.virtualServiceIndex.publicByGateway = map[string][]config.Config{} virtualServices, err := env.List(gvk.VirtualService, NamespaceAll) if err != nil { return err } // values returned from ConfigStore.List are immutable. // Therefore, we make a copy vservices := make([]config.Config, len(virtualServices)) for i := range vservices { vservices[i] = virtualServices[i].DeepCopy() } totalVirtualServices.Record(float64(len(virtualServices))) // TODO(rshriram): parse each virtual service and maintain a map of the // virtualservice name, the list of registry hosts in the VS and non // registry DNS names in the VS. This should cut down processing in // the RDS code. See separateVSHostsAndServices in route/route.go sortConfigByCreationTime(vservices) // convert all shortnames in virtual services into FQDNs for _, r := range vservices { resolveVirtualServiceShortnames(r.Spec.(*networking.VirtualService), r.Meta) } vservices, ps.virtualServiceIndex.delegates = mergeVirtualServicesIfNeeded(vservices, ps.exportToDefaults.virtualService) for _, virtualService := range vservices { ns := virtualService.Namespace rule := virtualService.Spec.(*networking.VirtualService) gwNames := getGatewayNames(rule) if len(rule.ExportTo) == 0 { // No exportTo in virtualService. Use the global default // We only honor ., * if ps.exportToDefaults.virtualService[visibility.Private] { if _, f := ps.virtualServiceIndex.privateByNamespaceAndGateway[ns]; !f { ps.virtualServiceIndex.privateByNamespaceAndGateway[ns] = map[string][]config.Config{} } // add to local namespace only private := ps.virtualServiceIndex.privateByNamespaceAndGateway for _, gw := range gwNames { private[ns][gw] = append(private[ns][gw], virtualService) } } else if ps.exportToDefaults.virtualService[visibility.Public] { for _, gw := range gwNames { ps.virtualServiceIndex.publicByGateway[gw] = append(ps.virtualServiceIndex.publicByGateway[gw], virtualService) } } } else { exportToMap := make(map[visibility.Instance]bool) for _, e := range rule.ExportTo { exportToMap[visibility.Instance(e)] = true } // if vs has exportTo ~ - i.e. not visible to anyone, ignore all exportTos // if vs has exportTo *, make public and ignore all other exportTos // if vs has exportTo ., replace with current namespace if exportToMap[visibility.Public] { for _, gw := range gwNames { ps.virtualServiceIndex.publicByGateway[gw] = append(ps.virtualServiceIndex.publicByGateway[gw], virtualService) } continue } else if exportToMap[visibility.None] { // not possible continue } else { // . or other namespaces for exportTo := range exportToMap { if exportTo == visibility.Private || string(exportTo) == ns { if _, f := ps.virtualServiceIndex.privateByNamespaceAndGateway[ns]; !f { ps.virtualServiceIndex.privateByNamespaceAndGateway[ns] = map[string][]config.Config{} } // add to local namespace only for _, gw := range gwNames { ps.virtualServiceIndex.privateByNamespaceAndGateway[ns][gw] = append(ps.virtualServiceIndex.privateByNamespaceAndGateway[ns][gw], virtualService) } } else { if _, f := ps.virtualServiceIndex.exportedToNamespaceByGateway[string(exportTo)]; !f { ps.virtualServiceIndex.exportedToNamespaceByGateway[string(exportTo)] = map[string][]config.Config{} } exported := ps.virtualServiceIndex.exportedToNamespaceByGateway // add to local namespace only for _, gw := range gwNames { exported[string(exportTo)][gw] = append(exported[string(exportTo)][gw], virtualService) } } } } } } return nil } var meshGateways = []string{constants.IstioMeshGateway} func getGatewayNames(vs *networking.VirtualService) []string { if len(vs.Gateways) == 0 { return meshGateways } res := make([]string, 0, len(vs.Gateways)) res = append(res, vs.Gateways...) return res } func (ps *PushContext) initDefaultExportMaps() { ps.exportToDefaults.destinationRule = make(map[visibility.Instance]bool) if ps.Mesh.DefaultDestinationRuleExportTo != nil { for _, e := range ps.Mesh.DefaultDestinationRuleExportTo { ps.exportToDefaults.destinationRule[visibility.Instance(e)] = true } } else { // default to * ps.exportToDefaults.destinationRule[visibility.Public] = true } ps.exportToDefaults.service = make(map[visibility.Instance]bool) if ps.Mesh.DefaultServiceExportTo != nil { for _, e := range ps.Mesh.DefaultServiceExportTo { ps.exportToDefaults.service[visibility.Instance(e)] = true } } else { ps.exportToDefaults.service[visibility.Public] = true } ps.exportToDefaults.virtualService = make(map[visibility.Instance]bool) if ps.Mesh.DefaultVirtualServiceExportTo != nil { for _, e := range ps.Mesh.DefaultVirtualServiceExportTo { ps.exportToDefaults.virtualService[visibility.Instance(e)] = true } } else { ps.exportToDefaults.virtualService[visibility.Public] = true } } // initSidecarScopes synthesizes Sidecar CRDs into objects called // SidecarScope. The SidecarScope object is a semi-processed view of the // service registry, and config state associated with the sidecar CRD. The // scope contains a set of inbound and outbound listeners, services/configs // per listener, etc. The sidecar scopes are precomputed based on the // Sidecar API objects in each namespace. If there is no sidecar api object // for a namespace, a default sidecarscope is assigned to the namespace // which enables connectivity to all services in the mesh. // // When proxies connect to Pilot, we identify the sidecar scope associated // with the proxy and derive listeners/routes/clusters based on the sidecar // scope. func (ps *PushContext) initSidecarScopes(env *Environment) error { sidecarConfigs, err := env.List(gvk.Sidecar, NamespaceAll) if err != nil { return err } sortConfigByCreationTime(sidecarConfigs) sidecarConfigWithSelector := make([]config.Config, 0) sidecarConfigWithoutSelector := make([]config.Config, 0) sidecarsWithoutSelectorByNamespace := sets.New() for _, sidecarConfig := range sidecarConfigs { sidecar := sidecarConfig.Spec.(*networking.Sidecar) if sidecar.WorkloadSelector != nil { sidecarConfigWithSelector = append(sidecarConfigWithSelector, sidecarConfig) } else { sidecarsWithoutSelectorByNamespace.Insert(sidecarConfig.Namespace) sidecarConfigWithoutSelector = append(sidecarConfigWithoutSelector, sidecarConfig) } } sidecarNum := len(sidecarConfigs) sidecarConfigs = make([]config.Config, 0, sidecarNum) // sidecars with selector take preference sidecarConfigs = append(sidecarConfigs, sidecarConfigWithSelector...) sidecarConfigs = append(sidecarConfigs, sidecarConfigWithoutSelector...) // Hold reference root namespace's sidecar config // Root namespace can have only one sidecar config object // Currently we expect that it has no workloadSelectors var rootNSConfig *config.Config ps.sidecarIndex.sidecarsByNamespace = make(map[string][]*SidecarScope, sidecarNum) for i, sidecarConfig := range sidecarConfigs { ps.sidecarIndex.sidecarsByNamespace[sidecarConfig.Namespace] = append(ps.sidecarIndex.sidecarsByNamespace[sidecarConfig.Namespace], ConvertToSidecarScope(ps, &sidecarConfig, sidecarConfig.Namespace)) if rootNSConfig == nil && sidecarConfig.Namespace == ps.Mesh.RootNamespace && sidecarConfig.Spec.(*networking.Sidecar).WorkloadSelector == nil { rootNSConfig = &sidecarConfigs[i] } } ps.sidecarIndex.rootConfig = rootNSConfig return nil } // Split out of DestinationRule expensive conversions - once per push. func (ps *PushContext) initDestinationRules(env *Environment) error { configs, err := env.List(gvk.DestinationRule, NamespaceAll) if err != nil { return err } // values returned from ConfigStore.List are immutable. // Therefore, we make a copy destRules := make([]config.Config, len(configs)) for i := range destRules { destRules[i] = configs[i].DeepCopy() } ps.SetDestinationRules(destRules) return nil } func newConsolidatedDestRules() *consolidatedDestRules { return &consolidatedDestRules{ exportTo: map[host.Name]map[visibility.Instance]bool{}, destRules: map[host.Name][]*consolidatedDestRule{}, } } // SetDestinationRules is updates internal structures using a set of configs. // Split out of DestinationRule expensive conversions, computed once per push. // This also allows tests to inject a config without having the mock. // This will not work properly for Sidecars, which will precompute their destination rules on init func (ps *PushContext) SetDestinationRules(configs []config.Config) { // Sort by time first. So if two destination rule have top level traffic policies // we take the first one. sortConfigByCreationTime(configs) namespaceLocalDestRules := make(map[string]*consolidatedDestRules) exportedDestRulesByNamespace := make(map[string]*consolidatedDestRules) rootNamespaceLocalDestRules := newConsolidatedDestRules() inheritedConfigs := make(map[string]*consolidatedDestRule) for i := range configs { rule := configs[i].Spec.(*networking.DestinationRule) if features.EnableDestinationRuleInheritance && rule.Host == "" { if t, ok := inheritedConfigs[configs[i].Namespace]; ok { log.Warnf("Namespace/mesh-level DestinationRule is already defined for %q at time %v."+ " Ignore %q which was created at time %v", configs[i].Namespace, t.rule.CreationTimestamp, configs[i].Name, configs[i].CreationTimestamp) continue } inheritedConfigs[configs[i].Namespace] = convertConsolidatedDestRule(&configs[i]) } rule.Host = string(ResolveShortnameToFQDN(rule.Host, configs[i].Meta)) exportToMap := make(map[visibility.Instance]bool) // destination rules with workloadSelector should not be exported to other namespaces if rule.GetWorkloadSelector() == nil { for _, e := range rule.ExportTo { exportToMap[visibility.Instance(e)] = true } } else { exportToMap[visibility.Private] = true } // add only if the dest rule is exported with . or * or explicit exportTo containing this namespace // The global exportTo doesn't matter here (its either . or * - both of which are applicable here) if len(exportToMap) == 0 || exportToMap[visibility.Public] || exportToMap[visibility.Private] || exportToMap[visibility.Instance(configs[i].Namespace)] { // Store in an index for the config's namespace // a proxy from this namespace will first look here for the destination rule for a given service // This pool consists of both public/private destination rules. if _, exist := namespaceLocalDestRules[configs[i].Namespace]; !exist { namespaceLocalDestRules[configs[i].Namespace] = newConsolidatedDestRules() } // Merge this destination rule with any public/private dest rules for same host in the same namespace // If there are no duplicates, the dest rule will be added to the list ps.mergeDestinationRule(namespaceLocalDestRules[configs[i].Namespace], configs[i], exportToMap) } isPrivateOnly := false // No exportTo in destinationRule. Use the global default // We only honor . and * if len(exportToMap) == 0 && ps.exportToDefaults.destinationRule[visibility.Private] { isPrivateOnly = true } else if len(exportToMap) == 1 && (exportToMap[visibility.Private] || exportToMap[visibility.Instance(configs[i].Namespace)]) { isPrivateOnly = true } if !isPrivateOnly { if _, exist := exportedDestRulesByNamespace[configs[i].Namespace]; !exist { exportedDestRulesByNamespace[configs[i].Namespace] = newConsolidatedDestRules() } // Merge this destination rule with any other exported dest rule for the same host in the same namespace // If there are no duplicates, the dest rule will be added to the list ps.mergeDestinationRule(exportedDestRulesByNamespace[configs[i].Namespace], configs[i], exportToMap) } else if configs[i].Namespace == ps.Mesh.RootNamespace { // Keep track of private root namespace destination rules ps.mergeDestinationRule(rootNamespaceLocalDestRules, configs[i], exportToMap) } } // precompute DestinationRules with inherited fields if features.EnableDestinationRuleInheritance { globalRule := inheritedConfigs[ps.Mesh.RootNamespace] for ns := range namespaceLocalDestRules { nsRule := inheritedConfigs[ns] inheritedRule := ps.inheritDestinationRule(globalRule, nsRule) for hostname, cfgList := range namespaceLocalDestRules[ns].destRules { for i, cfg := range cfgList { namespaceLocalDestRules[ns].destRules[hostname][i] = ps.inheritDestinationRule(inheritedRule, cfg) } } // update namespace rule after it has been merged with mesh rule inheritedConfigs[ns] = inheritedRule } // can't precalculate exportedDestRulesByNamespace since we don't know all the client namespaces in advance // inheritance is performed in getExportedDestinationRuleFromNamespace } ps.destinationRuleIndex.namespaceLocal = namespaceLocalDestRules ps.destinationRuleIndex.exportedByNamespace = exportedDestRulesByNamespace ps.destinationRuleIndex.rootNamespaceLocal = rootNamespaceLocalDestRules ps.destinationRuleIndex.inheritedByNamespace = inheritedConfigs } func (ps *PushContext) initAuthorizationPolicies(env *Environment) error { var err error if ps.AuthzPolicies, err = GetAuthorizationPolicies(env); err != nil { authzLog.Errorf("failed to initialize authorization policies: %v", err) return err } return nil } func (ps *PushContext) initTelemetry(env *Environment) (err error) { if ps.Telemetry, err = getTelemetries(env); err != nil { telemetryLog.Errorf("failed to initialize telemetry: %v", err) return } return } func (ps *PushContext) initProxyConfigs(env *Environment) error { var err error if ps.ProxyConfigs, err = GetProxyConfigs(env.ConfigStore, env.Mesh()); err != nil { pclog.Errorf("failed to initialize proxy configs: %v", err) return err } return nil } // pre computes WasmPlugins per namespace func (ps *PushContext) initWasmPlugins(env *Environment) error { wasmplugins, err := env.List(gvk.WasmPlugin, NamespaceAll) if err != nil { return err } sortConfigByCreationTime(wasmplugins) ps.wasmPluginsByNamespace = map[string][]*WasmPluginWrapper{} for _, plugin := range wasmplugins { if pluginWrapper := convertToWasmPluginWrapper(plugin); pluginWrapper != nil { ps.wasmPluginsByNamespace[plugin.Namespace] = append(ps.wasmPluginsByNamespace[plugin.Namespace], pluginWrapper) } } return nil } // WasmPlugins return the WasmPluginWrappers of a proxy func (ps *PushContext) WasmPlugins(proxy *Proxy) map[extensions.PluginPhase][]*WasmPluginWrapper { if proxy == nil { return nil } matchedPlugins := make(map[extensions.PluginPhase][]*WasmPluginWrapper) // First get all the extension configs from the config root namespace // and then add the ones from proxy's own namespace if ps.Mesh.RootNamespace != "" { // if there is no workload selector, the config applies to all workloads // if there is a workload selector, check for matching workload labels for _, plugin := range ps.wasmPluginsByNamespace[ps.Mesh.RootNamespace] { if plugin.Selector == nil || labels.Instance(plugin.Selector.MatchLabels).SubsetOf(proxy.Metadata.Labels) { matchedPlugins[plugin.Phase] = append(matchedPlugins[plugin.Phase], plugin) } } } // To prevent duplicate extensions in case root namespace equals proxy's namespace if proxy.ConfigNamespace != ps.Mesh.RootNamespace { for _, plugin := range ps.wasmPluginsByNamespace[proxy.ConfigNamespace] { if plugin.Selector == nil || labels.Instance(plugin.Selector.MatchLabels).SubsetOf(proxy.Metadata.Labels) { matchedPlugins[plugin.Phase] = append(matchedPlugins[plugin.Phase], plugin) } } } // sort slices by priority for i, slice := range matchedPlugins { sort.SliceStable(slice, func(i, j int) bool { iPriority := int64(math.MinInt64) if prio := slice[i].Priority; prio != nil { iPriority = prio.Value } jPriority := int64(math.MinInt64) if prio := slice[j].Priority; prio != nil { jPriority = prio.Value } return iPriority > jPriority }) matchedPlugins[i] = slice } return matchedPlugins } // pre computes envoy filters per namespace func (ps *PushContext) initEnvoyFilters(env *Environment) error { envoyFilterConfigs, err := env.List(gvk.EnvoyFilter, NamespaceAll) if err != nil { return err } sort.Slice(envoyFilterConfigs, func(i, j int) bool { ifilter := envoyFilterConfigs[i].Spec.(*networking.EnvoyFilter) jfilter := envoyFilterConfigs[j].Spec.(*networking.EnvoyFilter) if ifilter.Priority != jfilter.Priority { return ifilter.Priority < jfilter.Priority } // If priority is same fallback to name and creation timestamp, else use priority. // If creation time is the same, then behavior is nondeterministic. In this case, we can // pick an arbitrary but consistent ordering based on name and namespace, which is unique. // CreationTimestamp is stored in seconds, so this is not uncommon. if envoyFilterConfigs[i].CreationTimestamp != envoyFilterConfigs[j].CreationTimestamp { return envoyFilterConfigs[i].CreationTimestamp.Before(envoyFilterConfigs[j].CreationTimestamp) } in := envoyFilterConfigs[i].Name + "." + envoyFilterConfigs[i].Namespace jn := envoyFilterConfigs[j].Name + "." + envoyFilterConfigs[j].Namespace return in < jn }) ps.envoyFiltersByNamespace = make(map[string][]*EnvoyFilterWrapper) for _, envoyFilterConfig := range envoyFilterConfigs { efw := convertToEnvoyFilterWrapper(&envoyFilterConfig) if _, exists := ps.envoyFiltersByNamespace[envoyFilterConfig.Namespace]; !exists { ps.envoyFiltersByNamespace[envoyFilterConfig.Namespace] = make([]*EnvoyFilterWrapper, 0) } ps.envoyFiltersByNamespace[envoyFilterConfig.Namespace] = append(ps.envoyFiltersByNamespace[envoyFilterConfig.Namespace], efw) } return nil } // EnvoyFilters return the merged EnvoyFilterWrapper of a proxy func (ps *PushContext) EnvoyFilters(proxy *Proxy) *EnvoyFilterWrapper { // this should never happen if proxy == nil { return nil } var matchedEnvoyFilters []*EnvoyFilterWrapper // EnvoyFilters supports inheritance (global ones plus namespace local ones). // First get all the filter configs from the config root namespace // and then add the ones from proxy's own namespace if ps.Mesh.RootNamespace != "" { matchedEnvoyFilters = ps.getMatchedEnvoyFilters(proxy, ps.Mesh.RootNamespace) } // To prevent duplicate envoyfilters in case root namespace equals proxy's namespace if proxy.ConfigNamespace != ps.Mesh.RootNamespace { matched := ps.getMatchedEnvoyFilters(proxy, proxy.ConfigNamespace) matchedEnvoyFilters = append(matchedEnvoyFilters, matched...) } var out *EnvoyFilterWrapper if len(matchedEnvoyFilters) > 0 { out = &EnvoyFilterWrapper{ // no need populate workloadSelector, as it is not used later. Patches: make(map[networking.EnvoyFilter_ApplyTo][]*EnvoyFilterConfigPatchWrapper), } // merge EnvoyFilterWrapper for _, efw := range matchedEnvoyFilters { for applyTo, cps := range efw.Patches { for _, cp := range cps { if proxyMatch(proxy, cp) { out.Patches[applyTo] = append(out.Patches[applyTo], cp) } } } } } return out } // if there is no workload selector, the config applies to all workloads // if there is a workload selector, check for matching workload labels func (ps *PushContext) getMatchedEnvoyFilters(proxy *Proxy, namespaces string) []*EnvoyFilterWrapper { matchedEnvoyFilters := make([]*EnvoyFilterWrapper, 0) for _, efw := range ps.envoyFiltersByNamespace[namespaces] { if efw.workloadSelector == nil || efw.workloadSelector.SubsetOf(proxy.Metadata.Labels) { matchedEnvoyFilters = append(matchedEnvoyFilters, efw) } } return matchedEnvoyFilters } // HasEnvoyFilters checks if an EnvoyFilter exists with the given name at the given namespace. func (ps *PushContext) HasEnvoyFilters(name, namespace string) bool { for _, efw := range ps.envoyFiltersByNamespace[namespace] { if efw.Name == name { return true } } return false } // pre computes gateways per namespace func (ps *PushContext) initGateways(env *Environment) error { gatewayConfigs, err := env.List(gvk.Gateway, NamespaceAll) if err != nil { return err } sortConfigByCreationTime(gatewayConfigs) if features.ScopeGatewayToNamespace { ps.gatewayIndex.namespace = make(map[string][]config.Config) for _, gatewayConfig := range gatewayConfigs { if _, exists := ps.gatewayIndex.namespace[gatewayConfig.Namespace]; !exists { ps.gatewayIndex.namespace[gatewayConfig.Namespace] = make([]config.Config, 0) } ps.gatewayIndex.namespace[gatewayConfig.Namespace] = append(ps.gatewayIndex.namespace[gatewayConfig.Namespace], gatewayConfig) } } else { ps.gatewayIndex.all = gatewayConfigs } return nil } func (ps *PushContext) initServiceMetadata(env *Environment) error { metadataConfig, err := env.List(gvk.ServiceMetadata, NamespaceAll) if err != nil { return err } sortConfigByCreationTime(metadataConfig) ps.serviceMetadataIndex.namespace = make(map[string][]*config.Config) ps.serviceMetadataIndex.applicationNameByNamespace = make(map[string]map[string]*config.Config) ps.serviceMetadataIndex.all = make([]*config.Config, 0) for _, conf := range metadataConfig { if _, ok := ps.serviceMetadataIndex.namespace[conf.Namespace]; !ok { ps.serviceMetadataIndex.namespace[conf.Namespace] = make([]*config.Config, 0) } if _, ok := ps.serviceMetadataIndex.applicationNameByNamespace[conf.Namespace]; !ok { ps.serviceMetadataIndex.applicationNameByNamespace[conf.Namespace] = make(map[string]*config.Config, 0) } ps.serviceMetadataIndex.namespace[conf.Namespace] = append(ps.serviceMetadataIndex.namespace[conf.Namespace], &conf) ps.serviceMetadataIndex.applicationNameByNamespace[conf.Namespace][conf.Name] = &conf ps.serviceMetadataIndex.all = append(ps.serviceMetadataIndex.all, &conf) } return nil } // InternalGatewayServiceAnnotation represents the hostname of the service a gateway will use. This is // only used internally to transfer information from the Kubernetes Gateway API to the Istio Gateway API // which does not have a field to represent this. // The format is a comma separated list of hostnames. For example, "ingress.dubbo-system.svc.cluster.local,ingress.example.com" // The Gateway will apply to all ServiceInstances of these services, *in the same namespace as the Gateway*. const InternalGatewayServiceAnnotation = "internal.istio.io/gateway-service" type gatewayWithInstances struct { gateway config.Config // If true, ports that are not present in any instance will be used directly (without targetPort translation) // This supports the legacy behavior of selecting gateways by pod label selector legacyGatewaySelector bool instances []*ServiceInstance } func (ps *PushContext) mergeGateways(proxy *Proxy) *MergedGateway { // this should never happen if proxy == nil { return nil } gatewayInstances := make([]gatewayWithInstances, 0) var configs []config.Config if features.ScopeGatewayToNamespace { configs = ps.gatewayIndex.namespace[proxy.ConfigNamespace] } else { configs = ps.gatewayIndex.all } for _, cfg := range configs { gw := cfg.Spec.(*networking.Gateway) if gwsvcstr, f := cfg.Annotations[InternalGatewayServiceAnnotation]; f { gwsvcs := strings.Split(gwsvcstr, ",") known := map[host.Name]struct{}{} for _, g := range gwsvcs { known[host.Name(g)] = struct{}{} } matchingInstances := make([]*ServiceInstance, 0, len(proxy.ServiceInstances)) for _, si := range proxy.ServiceInstances { if _, f := known[si.Service.Hostname]; f && si.Service.Attributes.Namespace == cfg.Namespace { matchingInstances = append(matchingInstances, si) } } // Only if we have a matching instance should we apply the configuration if len(matchingInstances) > 0 { gatewayInstances = append(gatewayInstances, gatewayWithInstances{cfg, false, matchingInstances}) } } else if gw.GetSelector() == nil { // no selector. Applies to all workloads asking for the gateway gatewayInstances = append(gatewayInstances, gatewayWithInstances{cfg, true, proxy.ServiceInstances}) } else { gatewaySelector := labels.Instance(gw.GetSelector()) if gatewaySelector.SubsetOf(proxy.Metadata.Labels) { gatewayInstances = append(gatewayInstances, gatewayWithInstances{cfg, true, proxy.ServiceInstances}) } } } if len(gatewayInstances) == 0 { return nil } return MergeGateways(gatewayInstances, proxy, ps) } // GatewayContext contains a minimal subset of push context functionality to be exposed to GatewayAPIControllers type GatewayContext struct { ps *PushContext } func NewGatewayContext(ps *PushContext) GatewayContext { return GatewayContext{ps} } // ResolveGatewayInstances attempts to resolve all instances that a gateway will be exposed on. // Note: this function considers *all* instances of the service; its possible those instances will not actually be properly functioning // gateways, so this is not 100% accurate, but sufficient to expose intent to users. // The actual configuration generation is done on a per-workload basis and will get the exact set of matched instances for that workload. // Three sets are exposed: // * Internal addresses (ie istio-ingressgateway.dubbo-system.svc.cluster.local:80). // * External addresses (ie 1.2.3.4), this comes from LoadBalancer services. There may be multiple in some cases (especially multi cluster). // * Warnings for references that could not be resolved. These are intended to be user facing. func (gc GatewayContext) ResolveGatewayInstances(namespace string, gwsvcs []string, servers []*networking.Server) (internal, external, warns []string) { ports := map[int]struct{}{} for _, s := range servers { ports[int(s.Port.Number)] = struct{}{} } foundInternal := sets.New() foundExternal := sets.New() warnings := []string{} for _, g := range gwsvcs { svc, f := gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)][namespace] if !f { otherNamespaces := []string{} for ns := range gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)] { otherNamespaces = append(otherNamespaces, `"`+ns+`"`) // Wrap in quotes for output } if len(otherNamespaces) > 0 { sort.Strings(otherNamespaces) warnings = append(warnings, fmt.Sprintf("hostname %q not found in namespace %q, but it was found in namespace(s) %v", g, namespace, strings.Join(otherNamespaces, ", "))) } else { warnings = append(warnings, fmt.Sprintf("hostname %q not found", g)) } continue } svcKey := svc.Key() for port := range ports { instances := gc.ps.ServiceIndex.instancesByPort[svcKey][port] if len(instances) > 0 { foundInternal.Insert(fmt.Sprintf("%s:%d", g, port)) // Fetch external IPs from all clusters svc.Attributes.ClusterExternalAddresses.ForEach(func(c cluster.ID, externalIPs []string) { foundExternal.InsertAll(externalIPs...) }) } else { if instancesEmpty(gc.ps.ServiceIndex.instancesByPort[svcKey]) { warnings = append(warnings, fmt.Sprintf("no instances found for hostname %q", g)) } else { hintPort := sets.New() for _, instances := range gc.ps.ServiceIndex.instancesByPort[svcKey] { for _, i := range instances { if i.Endpoint.EndpointPort == uint32(port) { hintPort.Insert(strconv.Itoa(i.ServicePort.Port)) } } } if len(hintPort) > 0 { warnings = append(warnings, fmt.Sprintf( "port %d not found for hostname %q (hint: the service port should be specified, not the workload port. Did you mean one of these ports: %v?)", port, g, hintPort.SortedList())) } else { warnings = append(warnings, fmt.Sprintf("port %d not found for hostname %q", port, g)) } } } } } sort.Strings(warnings) return foundInternal.SortedList(), foundExternal.SortedList(), warnings } func instancesEmpty(m map[int][]*ServiceInstance) bool { for _, instances := range m { if len(instances) > 0 { return false } } return true } func (ps *PushContext) NetworkManager() *NetworkManager { return ps.networkMgr } // BestEffortInferServiceMTLSMode infers the mTLS mode for the service + port from all authentication // policies (both alpha and beta) in the system. The function always returns MTLSUnknown for external service. // The result is a best effort. It is because the PeerAuthentication is workload-based, this function is unable // to compute the correct service mTLS mode without knowing service to workload binding. For now, this // function uses only mesh and namespace level PeerAuthentication and ignore workload & port level policies. // This function is used to give a hint for auto-mTLS configuration on client side. func (ps *PushContext) BestEffortInferServiceMTLSMode(tp *networking.TrafficPolicy, service *Service, port *Port) MutualTLSMode { if service.MeshExternal { // Only need the authentication mTLS mode when service is not external. return MTLSUnknown } // For passthrough traffic (headless service or explicitly defined in DestinationRule), we look at the instances // If ALL instances have a sidecar, we enable TLS, otherwise we disable // TODO(https://github.com/istio/istio/issues/27376) enable mixed deployments // A service with passthrough resolution is always passthrough, regardless of the TrafficPolicy. if service.Resolution == Passthrough || tp.GetLoadBalancer().GetSimple() == networking.LoadBalancerSettings_PASSTHROUGH { instances := ps.ServiceInstancesByPort(service, port.Port, nil) if len(instances) == 0 { return MTLSDisable } for _, i := range instances { // Infer mTls disabled if any of the endpoint is with tls disabled if i.Endpoint.TLSMode == DisabledTLSModeLabel { return MTLSDisable } } } // 2. check mTLS settings from beta policy (i.e PeerAuthentication) at namespace / mesh level. // If the mode is not unknown, use it. if serviceMTLSMode := ps.AuthnPolicies.GetNamespaceMutualTLSMode(service.Attributes.Namespace); serviceMTLSMode != MTLSUnknown { return serviceMTLSMode } // Fallback to permissive. return MTLSPermissive } // ServiceInstancesByPort returns the cached instances by port if it exists. func (ps *PushContext) ServiceInstancesByPort(svc *Service, port int, labels labels.Instance) []*ServiceInstance { out := []*ServiceInstance{} if instances, exists := ps.ServiceIndex.instancesByPort[svc.Key()][port]; exists { // Use cached version of instances by port when labels are empty. if len(labels) == 0 { return instances } // If there are labels, we will filter instances by pod labels. for _, instance := range instances { // check that one of the input labels is a subset of the labels if labels.SubsetOf(instance.Endpoint.Labels) { out = append(out, instance) } } } return out } // initKubernetesGateways initializes Kubernetes gateway-api objects func (ps *PushContext) initKubernetesGateways(env *Environment) error { if env.GatewayAPIController != nil { ps.GatewayAPIController = env.GatewayAPIController return env.GatewayAPIController.Recompute(GatewayContext{ps}) } return nil } // Split out of ServiceNameMapping expensive conversions - once per push. func (ps *PushContext) initServiceNameMappings(env *Environment) error { configs, err := env.List(gvk.ServiceNameMapping, NamespaceAll) if err != nil { return err } // values returned from ConfigStore.List are immutable. // Therefore, we make a copy snpMappings := make([]*config.Config, len(configs)) for i := range snpMappings { deepCopy := configs[i].DeepCopy() snpMappings[i] = &deepCopy } for _, snp := range snpMappings { byNamespace := ps.serviceNameMappingIndex.namespace if _, exists := byNamespace[snp.Namespace]; !exists { byNamespace[snp.Namespace] = make([]*config.Config, 0) } byNamespace[snp.Namespace] = append(byNamespace[snp.Namespace], snp) interfaceByNamespace := ps.serviceNameMappingIndex.interfaceByNamespace if _, exists := interfaceByNamespace[snp.Namespace]; !exists { interfaceByNamespace[snp.Namespace] = make(map[string]*config.Config, 0) } mapping := snp.Spec.(*extensions.ServiceNameMapping) interfaceByNamespace[snp.Namespace][mapping.GetInterfaceName()] = snp } ps.serviceNameMappingIndex.all = snpMappings return nil } // ReferenceAllowed determines if a given resource (of type `kind` and name `resourceName`) can be // accessed by `namespace`, based of specific reference policies. // Note: this function only determines if a reference is *explicitly* allowed; the reference may not require // explicitly authorization to be made at all in most cases. Today, this only is for allowing cross-namespace // secret access. func (ps *PushContext) ReferenceAllowed(kind config.GroupVersionKind, resourceName string, namespace string) bool { // Currently, only Secret has reference policy, and only implemented by Gateway API controller. switch kind { case gvk.Secret: if ps.GatewayAPIController != nil { return ps.GatewayAPIController.SecretAllowed(resourceName, namespace) } default: } return false }