pkg/profiling/continuous/trigger/network.go (90 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package trigger import ( "fmt" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/profiling/continuous/base" "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/common" profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3" ) type NetworkTrigger struct { *BaseTrigger } func NewNetworkTrigger() base.Trigger { return &NetworkTrigger{} } func (n *NetworkTrigger) Init(moduleMgr *module.Manager, conf *base.ContinuousConfig) error { n.BaseTrigger = NewMultipleProcessBasedTrigger(conf, func(p api.ProcessInterface) string { // same instance entity := p.Entity() return fmt.Sprintf("%s_%s", entity.ServiceName, entity.InstanceName) }, func(ps []api.ProcessInterface) api.ProcessInterface { var mainApplication api.ProcessInterface for _, p := range ps { // for service mesh, find the application if processHasLabel(p, "mesh-application") { mainApplication = p } // otherwise, find the process belong a kubernetes service if mainApplication == nil && processHasLabel(p, "k8s-service") { mainApplication = p } } if mainApplication != nil { return mainApplication } return ps[0] }, func(task *profiling.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause) { task.TargetType = profiling.TargetTypeNetworkTopology task.ExtensionConfig = &profiling.ExtensionConfig{ NetworkSamplings: transformCausesToNetworkSamplingRules(thresholds), } }, func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause) { rules := transformCausesToNetworkSamplingRules(thresholds) uriRegexes := make([]string, 0) if len(rules) > 0 { for _, r := range rules { uriRegexes = append(uriRegexes, *r.URIRegex) } } report.TargetTask = &v3.ContinuousProfilingReport_Network{ Network: &v3.ContinuousNetworkProfilingTask{ SamplingURIRegexes: uriRegexes, }, } }) return n.BaseTrigger.Init(conf) } func processHasLabel(p api.ProcessInterface, label string) bool { for _, l := range p.Entity().Labels { if l == label { return true } } return false } func transformCausesToNetworkSamplingRules(thresholds []base.ThresholdCause) []*profiling.NetworkSamplingRule { result := make([]*profiling.NetworkSamplingRule, 0) var minDuration int32 = 0 for _, threshold := range thresholds { uriCause, ok := threshold.(*common.URICause) if !ok { continue } // collecting all request and response rule := &profiling.NetworkSamplingRule{ URIRegex: &uriCause.URI, MinDuration: &minDuration, When4XX: true, When5XX: true, Settings: &profiling.NetworkDataCollectingSettings{ RequireCompleteRequest: true, MaxRequestSize: -1, RequireCompleteResponse: true, MaxResponseSize: -1, }, } if uriCause.URI == "" { return []*profiling.NetworkSamplingRule{rule} } result = append(result, rule) } return result }