extension/jaegerremotesampling/internal/source/filesource/filesource.go (305 lines of code) (raw):

// Copyright The OpenTelemetry Authors // Copyright (c) 2018 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package filesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/filesource" import ( "bytes" "context" "encoding/json" "fmt" "net/http" "net/url" "os" "path/filepath" "sync" "sync/atomic" "time" "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" ) // null represents "null" JSON value and // it un-marshals to nil pointer. var nullJSON = []byte("null") type samplingProvider struct { logger *zap.Logger storedStrategies atomic.Value // holds *storedStrategies cancelFunc context.CancelFunc options Options wg sync.WaitGroup } type storedStrategies struct { defaultStrategy *api_v2.SamplingStrategyResponse serviceStrategies map[string]*api_v2.SamplingStrategyResponse } type strategyLoader func() ([]byte, error) // NewFileSource creates a strategy store that holds static sampling strategies. func NewFileSource(options Options, logger *zap.Logger) (source.Source, error) { ctx, cancelFunc := context.WithCancel(context.Background()) h := &samplingProvider{ logger: logger, cancelFunc: cancelFunc, options: options, } h.storedStrategies.Store(defaultStrategies()) if options.StrategiesFile == "" { h.logger.Info("No sampling strategies source provided, using defaults") return h, nil } loadFn := h.samplingStrategyLoader(options.StrategiesFile) strategies, err := loadStrategies(loadFn) if err != nil { return nil, err } else if strategies == nil { h.logger.Info("No sampling strategies found or URL is unavailable, using defaults") return h, nil } if !h.options.IncludeDefaultOpStrategies { h.logger.Warn("Default operations level strategies will not be included for Ratelimiting service strategies." + "This behavior will be changed in future releases. " + "Cf. https://github.com/jaegertracing/jaeger/issues/5270") h.parseStrategiesDeprecated(strategies) } else { h.parseStrategies(strategies) } if options.ReloadInterval > 0 { h.wg.Add(1) go h.autoUpdateStrategies(ctx, options.ReloadInterval, loadFn) } return h, nil } // GetSamplingStrategy implements StrategyStore#GetSamplingStrategy. func (h *samplingProvider) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { storedStrategies := h.storedStrategies.Load().(*storedStrategies) serviceStrategies := storedStrategies.serviceStrategies if strategy, ok := serviceStrategies[serviceName]; ok { return strategy, nil } h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName)) return storedStrategies.defaultStrategy, nil } // Close stops updating the strategies func (h *samplingProvider) Close() error { h.cancelFunc() h.wg.Wait() return nil } func (h *samplingProvider) downloadSamplingStrategies(samplingURL string) ([]byte, error) { h.logger.Info("Downloading sampling strategies", zap.String("url", samplingURL)) ctx, cx := context.WithTimeout(context.Background(), time.Second) defer cx() req, err := http.NewRequestWithContext(ctx, http.MethodGet, samplingURL, nil) if err != nil { return nil, fmt.Errorf("cannot construct HTTP request: %w", err) } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to download sampling strategies: %w", err) } defer resp.Body.Close() buf := new(bytes.Buffer) if _, err = buf.ReadFrom(resp.Body); err != nil { return nil, fmt.Errorf("failed to read sampling strategies HTTP response body: %w", err) } if resp.StatusCode == http.StatusServiceUnavailable { return nullJSON, nil } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf( "receiving %s while downloading strategies file: %s", resp.Status, buf.String(), ) } return buf.Bytes(), nil } func isURL(str string) bool { u, err := url.Parse(str) return err == nil && u.Scheme != "" && u.Host != "" } func (h *samplingProvider) samplingStrategyLoader(strategiesFile string) strategyLoader { if isURL(strategiesFile) { return func() ([]byte, error) { return h.downloadSamplingStrategies(strategiesFile) } } return func() ([]byte, error) { h.logger.Info("Loading sampling strategies", zap.String("filename", strategiesFile)) currBytes, err := os.ReadFile(filepath.Clean(strategiesFile)) if err != nil { return nil, fmt.Errorf("failed to read strategies file %s: %w", strategiesFile, err) } return currBytes, nil } } func (h *samplingProvider) autoUpdateStrategies(ctx context.Context, interval time.Duration, loader strategyLoader) { defer h.wg.Done() lastValue := string(nullJSON) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: lastValue = h.reloadSamplingStrategy(loader, lastValue) case <-ctx.Done(): return } } } func (h *samplingProvider) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string { newValue, err := loadFn() if err != nil { h.logger.Error("failed to re-load sampling strategies", zap.Error(err)) return lastValue } if lastValue == string(newValue) { return lastValue } if err := h.updateSamplingStrategy(newValue); err != nil { h.logger.Error("failed to update sampling strategies", zap.Error(err)) return lastValue } return string(newValue) } func (h *samplingProvider) updateSamplingStrategy(dataBytes []byte) error { var strategies strategies if err := json.Unmarshal(dataBytes, &strategies); err != nil { return fmt.Errorf("failed to unmarshal sampling strategies: %w", err) } h.parseStrategies(&strategies) h.logger.Info("Updated sampling strategies:" + string(dataBytes)) return nil } // TODO good candidate for a global util function func loadStrategies(loadFn strategyLoader) (*strategies, error) { strategyBytes, err := loadFn() if err != nil { return nil, err } var strategies *strategies if err := json.Unmarshal(strategyBytes, &strategies); err != nil { return nil, fmt.Errorf("failed to unmarshal strategies: %w", err) } return strategies, nil } func (h *samplingProvider) parseStrategiesDeprecated(strategies *strategies) { newStore := defaultStrategies() if strategies.DefaultStrategy != nil { newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) } merge := newStore.defaultStrategy.OperationSampling != nil && newStore.defaultStrategy.OperationSampling.PerOperationStrategies != nil for _, s := range strategies.ServiceStrategies { newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s) // Merge with the default operation strategies, because only merging with // the default strategy has no effect on service strategies (the default strategy // is not merged with and only used as a fallback). opS := newStore.serviceStrategies[s.Service].OperationSampling if opS == nil { if newStore.defaultStrategy.OperationSampling == nil || newStore.serviceStrategies[s.Service].ProbabilisticSampling == nil { continue } // Service has no per-operation strategies, so just reference the default settings and change default samplingRate. newOpS := *newStore.defaultStrategy.OperationSampling newOpS.DefaultSamplingProbability = newStore.serviceStrategies[s.Service].ProbabilisticSampling.SamplingRate newStore.serviceStrategies[s.Service].OperationSampling = &newOpS continue } if merge { opS.PerOperationStrategies = mergePerOperationSamplingStrategies( opS.PerOperationStrategies, newStore.defaultStrategy.OperationSampling.PerOperationStrategies) } } h.storedStrategies.Store(newStore) } func (h *samplingProvider) parseStrategies(strategies *strategies) { newStore := defaultStrategies() if strategies.DefaultStrategy != nil { newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) } for _, s := range strategies.ServiceStrategies { newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s) // Config for this service may not have per-operation strategies, // but if the default strategy has them they should still apply. if newStore.defaultStrategy.OperationSampling == nil { // Default strategy doens't have them either, nothing to do. continue } opS := newStore.serviceStrategies[s.Service].OperationSampling if opS == nil { // Service does not have its own per-operation rules, so copy (by value) from the default strategy. newOpS := *newStore.defaultStrategy.OperationSampling // If the service's own default is probabilistic, then its sampling rate should take precedence. if newStore.serviceStrategies[s.Service].ProbabilisticSampling != nil { newOpS.DefaultSamplingProbability = newStore.serviceStrategies[s.Service].ProbabilisticSampling.SamplingRate } newStore.serviceStrategies[s.Service].OperationSampling = &newOpS continue } // If the service did have its own per-operation strategies, then merge them with the default ones. opS.PerOperationStrategies = mergePerOperationSamplingStrategies( opS.PerOperationStrategies, newStore.defaultStrategy.OperationSampling.PerOperationStrategies) } h.storedStrategies.Store(newStore) } // mergePerOperationSamplingStrategies merges two operation strategies a and b, where a takes precedence over b. func mergePerOperationSamplingStrategies( a, b []*api_v2.OperationSamplingStrategy, ) []*api_v2.OperationSamplingStrategy { m := make(map[string]bool) for _, aOp := range a { m[aOp.Operation] = true } for _, bOp := range b { if m[bOp.Operation] { continue } a = append(a, bOp) } return a } func (h *samplingProvider) parseServiceStrategies(strategy *serviceStrategy) *api_v2.SamplingStrategyResponse { resp := h.parseStrategy(&strategy.strategy) if len(strategy.OperationStrategies) == 0 { return resp } opS := &api_v2.PerOperationSamplingStrategies{ DefaultSamplingProbability: defaultSamplingProbability, } if resp.StrategyType == api_v2.SamplingStrategyType_PROBABILISTIC { opS.DefaultSamplingProbability = resp.ProbabilisticSampling.SamplingRate } for _, operationStrategy := range strategy.OperationStrategies { s, ok := h.parseOperationStrategy(operationStrategy, opS) if !ok { continue } opS.PerOperationStrategies = append(opS.PerOperationStrategies, &api_v2.OperationSamplingStrategy{ Operation: operationStrategy.Operation, ProbabilisticSampling: s.ProbabilisticSampling, }) } resp.OperationSampling = opS return resp } func (h *samplingProvider) parseOperationStrategy( strategy *operationStrategy, parent *api_v2.PerOperationSamplingStrategies, ) (s *api_v2.SamplingStrategyResponse, ok bool) { s = h.parseStrategy(&strategy.strategy) if s.StrategyType == api_v2.SamplingStrategyType_RATE_LIMITING { // TODO OperationSamplingStrategy only supports probabilistic sampling h.logger.Warn( fmt.Sprintf( "Operation strategies only supports probabilistic sampling at the moment,"+ "'%s' defaulting to probabilistic sampling with probability %f", strategy.Operation, parent.DefaultSamplingProbability), zap.Any("strategy", strategy)) return nil, false } return s, true } func (h *samplingProvider) parseStrategy(strategy *strategy) *api_v2.SamplingStrategyResponse { switch strategy.Type { case samplerTypeProbabilistic: return &api_v2.SamplingStrategyResponse{ StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ SamplingRate: strategy.Param, }, } case samplerTypeRateLimiting: return &api_v2.SamplingStrategyResponse{ StrategyType: api_v2.SamplingStrategyType_RATE_LIMITING, RateLimitingSampling: &api_v2.RateLimitingSamplingStrategy{ MaxTracesPerSecond: int32(strategy.Param), }, } default: h.logger.Warn("Failed to parse sampling strategy", zap.Any("strategy", strategy)) return defaultStrategyResponse() } }