pilot/pkg/model/telemetry.go (785 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package model import ( "sort" "strings" "sync" "time" ) import ( listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" httpwasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3" hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" wasmfilter "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/wasm/v3" wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" wrappers "google.golang.org/protobuf/types/known/wrapperspb" sd "istio.io/api/envoy/extensions/stackdriver/config/v1alpha1" "istio.io/api/envoy/extensions/stats" meshconfig "istio.io/api/mesh/v1alpha1" tpb "istio.io/api/telemetry/v1alpha1" istiolog "istio.io/pkg/log" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking" "github.com/apache/dubbo-go-pixiu/pkg/config/labels" "github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections" "github.com/apache/dubbo-go-pixiu/pkg/config/xds" "github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal" "github.com/apache/dubbo-go-pixiu/pkg/util/sets" ) var telemetryLog = istiolog.RegisterScope("telemetry", "Istio Telemetry", 0) // Telemetry holds configuration for Telemetry API resources. type Telemetry struct { Name string `json:"name"` Namespace string `json:"namespace"` Spec *tpb.Telemetry `json:"spec"` } // Telemetries organizes Telemetry configuration by namespace. type Telemetries struct { // Maps from namespace to the Telemetry configs. NamespaceToTelemetries map[string][]Telemetry `json:"namespace_to_telemetries"` // The name of the root namespace. RootNamespace string `json:"root_namespace"` // Computed meshConfig meshConfig *meshconfig.MeshConfig // computedMetricsFilters contains the set of cached HCM/listener filters for the metrics portion. // These filters are extremely costly, as we insert them into every listener on every proxy, and to // generate them we need to merge many telemetry specs and perform 2 Any marshals. // To improve performance, we store a cache based on the Telemetries that impacted the filter, as well as // its class and protocol. This is protected by mu. // Currently, this only applies to metrics, but a similar concept can likely be applied to logging and // tracing for performance. // The computedMetricsFilters lifetime is bound to the Telemetries object. During a push context // creation, we will preserve the Telemetries (and thus the cache) if not Telemetries are modified. // As result, this cache will live until any Telemetry is modified. computedMetricsFilters map[metricsKey]interface{} mu sync.Mutex } // telemetryKey defines a key into the computedMetricsFilters cache. type telemetryKey struct { // Root stores the Telemetry in the root namespace, if any Root NamespacedName // Namespace stores the Telemetry in the root namespace, if any Namespace NamespacedName // Workload stores the Telemetry in the root namespace, if any Workload NamespacedName } // metricsKey defines a key into the computedMetricsFilters cache. type metricsKey struct { telemetryKey Class networking.ListenerClass Protocol networking.ListenerProtocol } // getTelemetries returns the Telemetry configurations for the given environment. func getTelemetries(env *Environment) (*Telemetries, error) { telemetries := &Telemetries{ NamespaceToTelemetries: map[string][]Telemetry{}, RootNamespace: env.Mesh().GetRootNamespace(), meshConfig: env.Mesh(), computedMetricsFilters: map[metricsKey]interface{}{}, } fromEnv, err := env.List(collections.IstioTelemetryV1Alpha1Telemetries.Resource().GroupVersionKind(), NamespaceAll) if err != nil { return nil, err } sortConfigByCreationTime(fromEnv) for _, config := range fromEnv { telemetry := Telemetry{ Name: config.Name, Namespace: config.Namespace, Spec: config.Spec.(*tpb.Telemetry), } telemetries.NamespaceToTelemetries[config.Namespace] = append(telemetries.NamespaceToTelemetries[config.Namespace], telemetry) } return telemetries, nil } type metricsConfig struct { ClientMetrics []metricsOverride ServerMetrics []metricsOverride } type telemetryFilterConfig struct { metricsConfig Provider *meshconfig.MeshConfig_ExtensionProvider Metrics bool AccessLogging bool LogsFilter *tpb.AccessLogging_Filter } func (t telemetryFilterConfig) MetricsForClass(c networking.ListenerClass) []metricsOverride { switch c { case networking.ListenerClassGateway: return t.ClientMetrics case networking.ListenerClassSidecarInbound: return t.ServerMetrics case networking.ListenerClassSidecarOutbound: return t.ClientMetrics default: return t.ClientMetrics } } type metricsOverride struct { Name string Disabled bool Tags []tagOverride } type tagOverride struct { Name string Remove bool Value string } // computedTelemetries contains the various Telemetry configurations in scope for a given proxy. // This can include the root namespace, namespace, and workload Telemetries combined type computedTelemetries struct { telemetryKey Metrics []*tpb.Metrics Logging []*tpb.AccessLogging Tracing []*tpb.Tracing } type TracingConfig struct { ServerSpec TracingSpec ClientSpec TracingSpec } type TracingSpec struct { Provider *meshconfig.MeshConfig_ExtensionProvider Disabled bool RandomSamplingPercentage float64 CustomTags map[string]*tpb.Tracing_CustomTag UseRequestIDForTraceSampling bool } type LoggingConfig struct { Providers []*meshconfig.MeshConfig_ExtensionProvider Filter *tpb.AccessLogging_Filter } func workloadMode(class networking.ListenerClass) tpb.WorkloadMode { switch class { case networking.ListenerClassGateway: return tpb.WorkloadMode_CLIENT case networking.ListenerClassSidecarInbound: return tpb.WorkloadMode_SERVER case networking.ListenerClassSidecarOutbound: return tpb.WorkloadMode_CLIENT case networking.ListenerClassUndefined: // this should not happened, just in case return tpb.WorkloadMode_CLIENT } return tpb.WorkloadMode_CLIENT } // AccessLogging returns the logging configuration for a given proxy and listener class. // If nil is returned, access logs are not configured via Telemetry and should use fallback mechanisms. // If a non-nil but empty configuration is passed, access logging is explicitly disabled. func (t *Telemetries) AccessLogging(proxy *Proxy, class networking.ListenerClass) *LoggingConfig { ct := t.applicableTelemetries(proxy) if len(ct.Logging) == 0 && len(t.meshConfig.GetDefaultProviders().GetAccessLogging()) == 0 { return nil } cfg := LoggingConfig{} providers, f := mergeLogs(ct.Logging, t.meshConfig, workloadMode(class)) cfg.Filter = f for _, p := range providers.SortedList() { fp := t.fetchProvider(p) if fp != nil { cfg.Providers = append(cfg.Providers, fp) } } return &cfg } // Tracing returns the logging tracing for a given proxy. If nil is returned, tracing // are not configured via Telemetry and should use fallback mechanisms. If a non-nil but disabled is set, // then tracing is explicitly disabled func (t *Telemetries) Tracing(proxy *Proxy) *TracingConfig { ct := t.applicableTelemetries(proxy) providerNames := t.meshConfig.GetDefaultProviders().GetTracing() hasDefaultProvider := len(providerNames) > 0 if len(ct.Tracing) == 0 && !hasDefaultProvider { return nil } clientSpec := TracingSpec{UseRequestIDForTraceSampling: true} serverSpec := TracingSpec{UseRequestIDForTraceSampling: true} if hasDefaultProvider { // todo: what do we want to do with more than one default provider? // for now, use only the first provider. fetched := t.fetchProvider(providerNames[0]) clientSpec.Provider = fetched serverSpec.Provider = fetched } for _, m := range ct.Tracing { names := getProviderNames(m.Providers) specs := []*TracingSpec{&clientSpec, &serverSpec} if m.Match != nil { switch m.Match.Mode { case tpb.WorkloadMode_CLIENT: specs = []*TracingSpec{&clientSpec} case tpb.WorkloadMode_SERVER: specs = []*TracingSpec{&serverSpec} } } if len(names) > 0 { // NOTE: we only support a single provider per mode // so, choosing the first provider returned in the list // is the "safest" fetched := t.fetchProvider(names[0]) for _, spec := range specs { spec.Provider = fetched } } // Now merge in any overrides if m.DisableSpanReporting != nil { for _, spec := range specs { spec.Disabled = m.DisableSpanReporting.GetValue() } } // TODO: metrics overrides do a deep merge, but here we do a shallow merge. // We should consider if we want to reconcile the two. if m.CustomTags != nil { for _, spec := range specs { spec.CustomTags = m.CustomTags } } if m.RandomSamplingPercentage != nil { for _, spec := range specs { spec.RandomSamplingPercentage = m.RandomSamplingPercentage.GetValue() } } if m.UseRequestIdForTraceSampling != nil { for _, spec := range specs { spec.UseRequestIDForTraceSampling = m.UseRequestIdForTraceSampling.Value } } } // If no provider is configured (and retrieved) for the tracing specs, // then we will disable the configuration. if clientSpec.Provider == nil { clientSpec.Disabled = true } if serverSpec.Provider == nil { serverSpec.Disabled = true } cfg := TracingConfig{ ClientSpec: clientSpec, ServerSpec: serverSpec, } return &cfg } // HTTPFilters computes the HttpFilter for a given proxy/class func (t *Telemetries) HTTPFilters(proxy *Proxy, class networking.ListenerClass) []*hcm.HttpFilter { if res := t.telemetryFilters(proxy, class, networking.ListenerProtocolHTTP); res != nil { return res.([]*hcm.HttpFilter) } return nil } // TCPFilters computes the TCPFilters for a given proxy/class func (t *Telemetries) TCPFilters(proxy *Proxy, class networking.ListenerClass) []*listener.Filter { if res := t.telemetryFilters(proxy, class, networking.ListenerProtocolTCP); res != nil { return res.([]*listener.Filter) } return nil } // applicableTelemetries fetches the relevant telemetry configurations for a given proxy func (t *Telemetries) applicableTelemetries(proxy *Proxy) computedTelemetries { if t == nil { return computedTelemetries{} } namespace := proxy.ConfigNamespace // Order here matters. The latter elements will override the first elements ms := []*tpb.Metrics{} ls := []*tpb.AccessLogging{} ts := []*tpb.Tracing{} key := telemetryKey{} if t.RootNamespace != "" { telemetry := t.namespaceWideTelemetryConfig(t.RootNamespace) if telemetry != (Telemetry{}) { key.Root = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace} ms = append(ms, telemetry.Spec.GetMetrics()...) ls = append(ls, telemetry.Spec.GetAccessLogging()...) ts = append(ts, telemetry.Spec.GetTracing()...) } } if namespace != t.RootNamespace { telemetry := t.namespaceWideTelemetryConfig(namespace) if telemetry != (Telemetry{}) { key.Namespace = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace} ms = append(ms, telemetry.Spec.GetMetrics()...) ls = append(ls, telemetry.Spec.GetAccessLogging()...) ts = append(ts, telemetry.Spec.GetTracing()...) } } for _, telemetry := range t.NamespaceToTelemetries[namespace] { spec := telemetry.Spec if len(spec.GetSelector().GetMatchLabels()) == 0 { continue } selector := labels.Instance(spec.GetSelector().GetMatchLabels()) if selector.SubsetOf(proxy.Metadata.Labels) { key.Workload = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace} ms = append(ms, spec.GetMetrics()...) ls = append(ls, spec.GetAccessLogging()...) ts = append(ts, spec.GetTracing()...) break } } return computedTelemetries{ telemetryKey: key, Metrics: ms, Logging: ls, Tracing: ts, } } // telemetryFilters computes the filters for the given proxy/class and protocol. This computes the // set of applicable Telemetries, merges them, then translates to the appropriate filters based on the // extension providers in the mesh config. Where possible, the result is cached. // Currently, this includes metrics and access logging, as some providers are implemented in filters. func (t *Telemetries) telemetryFilters(proxy *Proxy, class networking.ListenerClass, protocol networking.ListenerProtocol) interface{} { if t == nil { return nil } c := t.applicableTelemetries(proxy) key := metricsKey{ telemetryKey: c.telemetryKey, Class: class, Protocol: protocol, } t.mu.Lock() defer t.mu.Unlock() precomputed, f := t.computedMetricsFilters[key] if f { return precomputed } // First, take all the metrics configs and transform them into a normalized form tmm := mergeMetrics(c.Metrics, t.meshConfig) log.Debugf("merged metrics, proxyID: %s metrics: %+v", proxy.ID, tmm) // Additionally, fetch relevant access logging configurations tml, logsFilter := mergeLogs(c.Logging, t.meshConfig, workloadMode(class)) // The above result is in a nested map to deduplicate responses. This loses ordering, so we convert to // a list to retain stable naming allKeys := sets.New(tml.UnsortedList()...) for k := range tmm { allKeys.Insert(k) } m := make([]telemetryFilterConfig, 0, len(allKeys)) for _, k := range allKeys.SortedList() { p := t.fetchProvider(k) if p == nil { continue } _, logging := tml[k] _, metrics := tmm[k] cfg := telemetryFilterConfig{ Provider: p, metricsConfig: tmm[k], AccessLogging: logging, Metrics: metrics, LogsFilter: logsFilter, } m = append(m, cfg) } var res interface{} // Finally, compute the actual filters based on the protoc switch protocol { case networking.ListenerProtocolHTTP: res = buildHTTPTelemetryFilter(class, m) default: res = buildTCPTelemetryFilter(class, m) } // Update cache t.computedMetricsFilters[key] = res return res } // mergeLogs returns the set of providers for the given logging configuration. func mergeLogs(logs []*tpb.AccessLogging, mesh *meshconfig.MeshConfig, mode tpb.WorkloadMode) (sets.Set, *tpb.AccessLogging_Filter) { providers := sets.New() if len(logs) == 0 { for _, dp := range mesh.GetDefaultProviders().GetAccessLogging() { // Insert the default provider. providers.Insert(dp) } return providers, nil } var loggingFilter *tpb.AccessLogging_Filter providerNames := mesh.GetDefaultProviders().GetAccessLogging() for _, m := range logs { names := getProviderNames(m.Providers) if len(names) > 0 { providerNames = names } if m.Filter != nil { loggingFilter = m.Filter } } inScopeProviders := sets.New(providerNames...) parentProviders := mesh.GetDefaultProviders().GetAccessLogging() for _, m := range logs { providerNames := getProviderNames(m.Providers) if len(providerNames) == 0 { providerNames = parentProviders } parentProviders = providerNames for _, provider := range providerNames { if !inScopeProviders.Contains(provider) { // We don't care about this, remove it // This occurs when a top level provider is later disabled by a lower level continue } if !matchWorkloadMode(m.Match, mode) { continue } if m.GetDisabled().GetValue() { providers.Delete(provider) continue } providers.Insert(provider) } } return providers, loggingFilter } func matchWorkloadMode(selector *tpb.AccessLogging_LogSelector, mode tpb.WorkloadMode) bool { if selector == nil { return true } if selector.Mode == tpb.WorkloadMode_CLIENT_AND_SERVER { return true } return selector.Mode == mode } func (t *Telemetries) namespaceWideTelemetryConfig(namespace string) Telemetry { for _, tel := range t.NamespaceToTelemetries[namespace] { if len(tel.Spec.GetSelector().GetMatchLabels()) == 0 { return tel } } return Telemetry{} } // fetchProvider finds the matching ExtensionProviders from the mesh config func (t *Telemetries) fetchProvider(m string) *meshconfig.MeshConfig_ExtensionProvider { for _, p := range t.meshConfig.ExtensionProviders { if strings.EqualFold(m, p.Name) { return p } } return nil } var allMetrics = func() []string { r := make([]string, 0, len(tpb.MetricSelector_IstioMetric_value)) for k := range tpb.MetricSelector_IstioMetric_value { if k != tpb.MetricSelector_IstioMetric_name[int32(tpb.MetricSelector_ALL_METRICS)] { r = append(r, k) } } sort.Strings(r) return r }() // mergeMetrics merges many Metrics objects into a normalized configuration func mergeMetrics(metrics []*tpb.Metrics, mesh *meshconfig.MeshConfig) map[string]metricsConfig { type metricOverride struct { Disabled *wrappers.BoolValue TagOverrides map[string]*tpb.MetricsOverrides_TagOverride } // provider -> mode -> metric -> overrides providers := map[string]map[tpb.WorkloadMode]map[string]metricOverride{} if len(metrics) == 0 { for _, dp := range mesh.GetDefaultProviders().GetMetrics() { // Insert the default provider. It has no overrides; presence of the key is sufficient to // get the filter created. providers[dp] = map[tpb.WorkloadMode]map[string]metricOverride{} } } providerNames := mesh.GetDefaultProviders().GetMetrics() for _, m := range metrics { names := getProviderNames(m.Providers) // If providers is set, it overrides the parent. If not, inherent from the parent. It is not a deep merge. if len(names) > 0 { providerNames = names } } // Record the names of all providers we should configure. Anything else we will ignore inScopeProviders := sets.New(providerNames...) parentProviders := mesh.GetDefaultProviders().GetMetrics() disabledAllMetricsProviders := sets.New() for _, m := range metrics { providerNames := getProviderNames(m.Providers) // If providers is not set, use parent's if len(providerNames) == 0 { providerNames = parentProviders } parentProviders = providerNames for _, provider := range providerNames { if !inScopeProviders.Contains(provider) { // We don't care about this, remove it // This occurs when a top level provider is later disabled by a lower level continue } if _, f := providers[provider]; !f { providers[provider] = map[tpb.WorkloadMode]map[string]metricOverride{ tpb.WorkloadMode_CLIENT: {}, tpb.WorkloadMode_SERVER: {}, } } mp := providers[provider] // For each override, we normalize the configuration. The metrics list is an ordered list - latter // elements have precedence. As a result, we will apply updates on top of previous entries. for _, o := range m.Overrides { // if we disable all metrics, we should drop the entire filter if isAllMetrics(o.GetMatch()) && o.Disabled.GetValue() { disabledAllMetricsProviders.Insert(provider) continue } // root namespace disables all, but then enables them by namespace scoped disabledAllMetricsProviders.Delete(provider) metricsNames := getMatches(o.GetMatch()) // If client or server is set explicitly, only apply there. Otherwise, we will apply to both. // Note: client and server keys may end up the same, which is fine for _, mode := range getModes(o.GetMatch().GetMode()) { // Next, get all matches. // This is a bit funky because the matches are oneof of ENUM and customer metric. We normalize // these to strings, so we may end up with a list like [REQUEST_COUNT, my-customer-metric]. // TODO: we always flatten ALL_METRICS into each metric mode. For some stats providers (prometheus), // we are able to apply overrides to all metrics directly rather than duplicating the config. // We should tweak this to collapse to this mode where possible for _, metricName := range metricsNames { if _, f := mp[mode]; !f { mp[mode] = map[string]metricOverride{} } override := mp[mode][metricName] if o.Disabled != nil { override.Disabled = o.Disabled } for k, v := range o.TagOverrides { if override.TagOverrides == nil { override.TagOverrides = map[string]*tpb.MetricsOverrides_TagOverride{} } override.TagOverrides[k] = v } mp[mode][metricName] = override } } } } } processed := map[string]metricsConfig{} for provider, modeMap := range providers { if disabledAllMetricsProviders.Contains(provider) { continue } for mode, metricMap := range modeMap { for metric, override := range metricMap { tags := []tagOverride{} for k, v := range override.TagOverrides { o := tagOverride{Name: k} switch v.Operation { case tpb.MetricsOverrides_TagOverride_REMOVE: o.Remove = true o.Value = "" case tpb.MetricsOverrides_TagOverride_UPSERT: o.Value = v.GetValue() o.Remove = false } tags = append(tags, o) } // Keep order deterministic sort.Slice(tags, func(i, j int) bool { return tags[i].Name < tags[j].Name }) mo := metricsOverride{ Name: metric, Disabled: override.Disabled.GetValue(), Tags: tags, } tmm := processed[provider] switch mode { case tpb.WorkloadMode_CLIENT: tmm.ClientMetrics = append(tmm.ClientMetrics, mo) default: tmm.ServerMetrics = append(tmm.ServerMetrics, mo) } processed[provider] = tmm } } // Keep order deterministic tmm := processed[provider] sort.Slice(tmm.ServerMetrics, func(i, j int) bool { return tmm.ServerMetrics[i].Name < tmm.ServerMetrics[j].Name }) sort.Slice(tmm.ClientMetrics, func(i, j int) bool { return tmm.ClientMetrics[i].Name < tmm.ClientMetrics[j].Name }) processed[provider] = tmm } return processed } func getProviderNames(providers []*tpb.ProviderRef) []string { res := make([]string, 0, len(providers)) for _, p := range providers { res = append(res, p.GetName()) } return res } func getModes(mode tpb.WorkloadMode) []tpb.WorkloadMode { switch mode { case tpb.WorkloadMode_CLIENT, tpb.WorkloadMode_SERVER: return []tpb.WorkloadMode{mode} default: return []tpb.WorkloadMode{tpb.WorkloadMode_CLIENT, tpb.WorkloadMode_SERVER} } } func isAllMetrics(match *tpb.MetricSelector) bool { switch m := match.GetMetricMatch().(type) { case *tpb.MetricSelector_CustomMetric: return false case *tpb.MetricSelector_Metric: return m.Metric == tpb.MetricSelector_ALL_METRICS default: return false } } func getMatches(match *tpb.MetricSelector) []string { switch m := match.GetMetricMatch().(type) { case *tpb.MetricSelector_CustomMetric: return []string{m.CustomMetric} case *tpb.MetricSelector_Metric: if m.Metric == tpb.MetricSelector_ALL_METRICS { return allMetrics } return []string{m.Metric.String()} default: return allMetrics } } func statsRootIDForClass(class networking.ListenerClass) string { switch class { case networking.ListenerClassSidecarInbound: return "stats_inbound" default: return "stats_outbound" } } func buildHTTPTelemetryFilter(class networking.ListenerClass, metricsCfg []telemetryFilterConfig) []*hcm.HttpFilter { res := make([]*hcm.HttpFilter, 0, len(metricsCfg)) for _, cfg := range metricsCfg { switch cfg.Provider.GetProvider().(type) { case *meshconfig.MeshConfig_ExtensionProvider_Prometheus: if !cfg.Metrics { // No logging for prometheus continue } statsCfg := generateStatsConfig(class, cfg) vmConfig := ConstructVMConfig("/etc/istio/extensions/stats-filter.compiled.wasm", "envoy.wasm.stats") root := statsRootIDForClass(class) vmConfig.VmConfig.VmId = root wasmConfig := &httpwasm.Wasm{ Config: &wasm.PluginConfig{ RootId: root, Vm: vmConfig, Configuration: statsCfg, }, } f := &hcm.HttpFilter{ Name: xds.StatsFilterName, ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)}, } res = append(res, f) case *meshconfig.MeshConfig_ExtensionProvider_Stackdriver: sdCfg := generateSDConfig(class, cfg) vmConfig := ConstructVMConfig("", "envoy.wasm.null.stackdriver") vmConfig.VmConfig.VmId = stackdriverVMID(class) wasmConfig := &httpwasm.Wasm{ Config: &wasm.PluginConfig{ RootId: vmConfig.VmConfig.VmId, Vm: vmConfig, Configuration: sdCfg, }, } f := &hcm.HttpFilter{ Name: xds.StackdriverFilterName, ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)}, } res = append(res, f) default: // Only prometheus and SD supported currently continue } } return res } func buildTCPTelemetryFilter(class networking.ListenerClass, telemetryConfigs []telemetryFilterConfig) []*listener.Filter { res := []*listener.Filter{} for _, telemetryCfg := range telemetryConfigs { switch telemetryCfg.Provider.GetProvider().(type) { case *meshconfig.MeshConfig_ExtensionProvider_Prometheus: cfg := generateStatsConfig(class, telemetryCfg) vmConfig := ConstructVMConfig("/etc/istio/extensions/stats-filter.compiled.wasm", "envoy.wasm.stats") root := statsRootIDForClass(class) vmConfig.VmConfig.VmId = "tcp_" + root wasmConfig := &wasmfilter.Wasm{ Config: &wasm.PluginConfig{ RootId: root, Vm: vmConfig, Configuration: cfg, }, } f := &listener.Filter{ Name: xds.StatsFilterName, ConfigType: &listener.Filter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)}, } res = append(res, f) case *meshconfig.MeshConfig_ExtensionProvider_Stackdriver: cfg := generateSDConfig(class, telemetryCfg) vmConfig := ConstructVMConfig("", "envoy.wasm.null.stackdriver") vmConfig.VmConfig.VmId = stackdriverVMID(class) wasmConfig := &wasmfilter.Wasm{ Config: &wasm.PluginConfig{ RootId: vmConfig.VmConfig.VmId, Vm: vmConfig, Configuration: cfg, }, } f := &listener.Filter{ Name: xds.StackdriverFilterName, ConfigType: &listener.Filter_TypedConfig{TypedConfig: networking.MessageToAny(wasmConfig)}, } res = append(res, f) default: // Only prometheus and SD supported currently continue } } return res } func stackdriverVMID(class networking.ListenerClass) string { switch class { case networking.ListenerClassSidecarInbound: return "stackdriver_inbound" default: return "stackdriver_outbound" } } var metricToSDServerMetrics = map[string]string{ "REQUEST_COUNT": "server/request_count", "REQUEST_DURATION": "server/response_latencies", "REQUEST_SIZE": "server/request_bytes", "RESPONSE_SIZE": "server/response_bytes", "TCP_OPENED_CONNECTIONS": "server/connection_open_count", "TCP_CLOSED_CONNECTIONS": "server/connection_close_count", "TCP_SENT_BYTES": "server/sent_bytes_count", "TCP_RECEIVED_BYTES": "server/received_bytes_count", "GRPC_REQUEST_MESSAGES": "", "GRPC_RESPONSE_MESSAGES": "", } var metricToSDClientMetrics = map[string]string{ "REQUEST_COUNT": "client/request_count", "REQUEST_DURATION": "client/response_latencies", "REQUEST_SIZE": "client/request_bytes", "RESPONSE_SIZE": "client/response_bytes", "TCP_OPENED_CONNECTIONS": "client/connection_open_count", "TCP_CLOSED_CONNECTIONS": "client/connection_close_count", "TCP_SENT_BYTES": "client/sent_bytes_count", "TCP_RECEIVED_BYTES": "client/received_bytes_count", "GRPC_REQUEST_MESSAGES": "", "GRPC_RESPONSE_MESSAGES": "", } // used for CEL expressions in stackdriver serialization var jsonUnescaper = strings.NewReplacer(`\u003e`, `>`, `\u003c`, `<`, `\u0026`, `&`) func generateSDConfig(class networking.ListenerClass, telemetryConfig telemetryFilterConfig) *anypb.Any { cfg := sd.PluginConfig{ DisableHostHeaderFallback: disableHostHeaderFallback(class), } metricNameMap := metricToSDClientMetrics if class == networking.ListenerClassSidecarInbound { metricNameMap = metricToSDServerMetrics } for _, override := range telemetryConfig.MetricsForClass(class) { metricName, f := metricNameMap[override.Name] if !f { // Not a predefined metric, must be a custom one metricName = override.Name } if metricName == "" { continue } if cfg.MetricsOverrides == nil { cfg.MetricsOverrides = map[string]*sd.MetricsOverride{} } if _, f := cfg.MetricsOverrides[metricName]; !f { cfg.MetricsOverrides[metricName] = &sd.MetricsOverride{} } cfg.MetricsOverrides[metricName].Drop = override.Disabled for _, t := range override.Tags { if t.Remove { // Remove is not supported by SD continue } if cfg.MetricsOverrides[metricName].TagOverrides == nil { cfg.MetricsOverrides[metricName].TagOverrides = map[string]string{} } cfg.MetricsOverrides[metricName].TagOverrides[t.Name] = t.Value } } if telemetryConfig.AccessLogging { if telemetryConfig.LogsFilter != nil { cfg.AccessLoggingFilterExpression = telemetryConfig.LogsFilter.Expression } else { if class == networking.ListenerClassSidecarInbound { cfg.AccessLogging = sd.PluginConfig_FULL } else { // this can be achieved via CEL: `response.code >= 400 || response.code == 0` cfg.AccessLogging = sd.PluginConfig_ERRORS_ONLY } } } else { // The field is deprecated, but until it is removed we need to set it. cfg.DisableServerAccessLogging = true // nolint: staticcheck } cfg.MetricExpiryDuration = durationpb.New(1 * time.Hour) // In WASM we are not actually processing protobuf at all, so we need to encode this to JSON cfgJSON, _ := protomarshal.MarshalProtoNames(&cfg) // MarshalProtoNames() forces HTML-escaped JSON encoding. // this can be problematic for CEL expressions, particularly those using // '>', '<', and '&'s. It is easier to use replaceAll operations than it is // to mimic MarshalProtoNames() with configured JSON Encoder. pb := &wrappers.StringValue{Value: jsonUnescaper.Replace(string(cfgJSON))} return networking.MessageToAny(pb) } var metricToPrometheusMetric = map[string]string{ "REQUEST_COUNT": "requests_total", "REQUEST_DURATION": "request_duration_milliseconds", "REQUEST_SIZE": "request_bytes", "RESPONSE_SIZE": "response_bytes", "TCP_OPENED_CONNECTIONS": "tcp_connections_opened_total", "TCP_CLOSED_CONNECTIONS": "tcp_connections_closed_total", "TCP_SENT_BYTES": "tcp_sent_bytes_total", "TCP_RECEIVED_BYTES": "tcp_received_bytes_total", "GRPC_REQUEST_MESSAGES": "request_messages_total", "GRPC_RESPONSE_MESSAGES": "response_messages_total", } func generateStatsConfig(class networking.ListenerClass, metricsCfg telemetryFilterConfig) *anypb.Any { cfg := stats.PluginConfig{ DisableHostHeaderFallback: disableHostHeaderFallback(class), } for _, override := range metricsCfg.MetricsForClass(class) { metricName, f := metricToPrometheusMetric[override.Name] if !f { // Not a predefined metric, must be a custom one metricName = override.Name } mc := &stats.MetricConfig{ Dimensions: map[string]string{}, Name: metricName, Drop: override.Disabled, } for _, t := range override.Tags { if t.Remove { mc.TagsToRemove = append(mc.TagsToRemove, t.Name) } else { mc.Dimensions[t.Name] = t.Value } } cfg.Metrics = append(cfg.Metrics, mc) } // In WASM we are not actually processing protobuf at all, so we need to encode this to JSON cfgJSON, _ := protomarshal.MarshalProtoNames(&cfg) return networking.MessageToAny(&wrappers.StringValue{Value: string(cfgJSON)}) } func disableHostHeaderFallback(class networking.ListenerClass) bool { return class == networking.ListenerClassSidecarInbound || class == networking.ListenerClassGateway }