pkg/process/finders/storage.go (360 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package finders import ( "context" "sync" "sync/atomic" "time" "github.com/hashicorp/go-multierror" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/process/v3" "github.com/apache/skywalking-rover/pkg/core" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/process/finders/base" "github.com/apache/skywalking-rover/pkg/tools" ) type ProcessStorage struct { processes map[api.ProcessDetectType][]*ProcessContext mutex sync.Mutex // process listeners listeners []api.ProcessListener eventQueue chan *processEvent initListenQueue chan api.ProcessListener listenerRecheckInterval time.Duration // working with backend reportInterval time.Duration propertiesReportFactor int roverID string processClient v3.EBPFProcessServiceClient finders map[api.ProcessDetectType]base.ProcessFinder reportedCount int64 // report context ctx context.Context cancel context.CancelFunc } func NewProcessStorage(ctx context.Context, moduleManager *module.Manager, reportInterval time.Duration, propertiesReportFactor int, finderList []base.ProcessFinder, listenerRecheckInterval time.Duration) (*ProcessStorage, error) { data := make(map[api.ProcessDetectType][]*ProcessContext) // working with core module coreOperator := moduleManager.FindModule(core.ModuleName).(core.Operator) roverID := coreOperator.InstanceID() backendConn := coreOperator.BackendOperator().GetConnection() processClient := v3.NewEBPFProcessServiceClient(backendConn) ctx, cancel := context.WithCancel(ctx) fs := make(map[api.ProcessDetectType]base.ProcessFinder) for _, f := range finderList { fs[f.DetectType()] = f } return &ProcessStorage{ processes: data, reportInterval: reportInterval, propertiesReportFactor: propertiesReportFactor, eventQueue: make(chan *processEvent, 100), initListenQueue: make(chan api.ProcessListener, 100), listenerRecheckInterval: listenerRecheckInterval, reportedCount: 0, roverID: roverID, processClient: processClient, finders: fs, ctx: ctx, cancel: cancel, }, nil } func (s *ProcessStorage) StartReport() { // for report all processes go func() { timeTicker := time.NewTicker(s.reportInterval) for { select { case <-timeTicker.C: if err := s.reportAllProcesses(); err != nil { log.Errorf("report all processes error: %v", err) } case <-s.ctx.Done(): timeTicker.Stop() return } } }() // for start listener go func() { timeTicker := time.NewTicker(s.listenerRecheckInterval) for { select { case <-timeTicker.C: s.notifyToRecheckAllProcesses(s.listeners) case e := <-s.eventQueue: s.consumeProcessEvent(s.listeners, e) case l := <-s.initListenQueue: s.notifyToRecheckAllProcesses([]api.ProcessListener{l}) case <-s.ctx.Done(): timeTicker.Stop() return } } }() } func (s *ProcessStorage) StopReport() error { s.cancel() return nil } func (s *ProcessStorage) reportAllProcesses() error { s.mutex.Lock() defer s.mutex.Unlock() if len(s.processes) == 0 { return nil } // build process list(wait report or keep alive) waitReportProcesses := make([]*ProcessContext, 0) keepAliveProcesses := make([]*ProcessContext, 0) for _, finderProcesses := range s.processes { for _, p := range finderProcesses { if p.syncStatus == NotReport { waitReportProcesses = append(waitReportProcesses, p) } else if p.syncStatus == ReportSuccess { keepAliveProcesses = append(keepAliveProcesses, p) } } } // if rover should report the properties, then need to force remove all keep alive processes to report shouldReportProperties := atomic.AddInt64(&s.reportedCount, 1)%int64(s.propertiesReportFactor) == 0 if shouldReportProperties { log.Infof("detection has reached the properties report factor, forced to report all processes properties") waitReportProcesses = append(waitReportProcesses, keepAliveProcesses...) keepAliveProcesses = make([]*ProcessContext, 0) } var result error if err := s.processesReport(waitReportProcesses); err != nil { result = multierror.Append(result, err) } if err := s.processesKeepAlive(keepAliveProcesses); err != nil { result = multierror.Append(result, err) } return result } func (s *ProcessStorage) processesKeepAlive(waitKeepAliveProcess []*ProcessContext) error { if len(waitKeepAliveProcess) == 0 { return nil } processIDList := make([]*v3.EBPFProcessPingPkg, 0) for _, ps := range waitKeepAliveProcess { if ps.id == "" { log.Warnf("the process id is not found before keep alive, need to report, pid: %d, process name: %s", ps.Pid(), ps.Entity().ProcessName) ps.syncStatus = NotReport continue } processIDList = append(processIDList, &v3.EBPFProcessPingPkg{ EntityMetadata: &v3.EBPFProcessEntityMetadata{ Layer: ps.Entity().Layer, ServiceName: ps.Entity().ServiceName, InstanceName: ps.Entity().InstanceName, ProcessName: ps.Entity().ProcessName, Labels: ps.Entity().Labels, }, Properties: s.finders[ps.detectType].BuildNecessaryProperties(ps.detectProcess), }) } _, err := s.processClient.KeepAlive(s.ctx, &v3.EBPFProcessPingPkgList{ EbpfAgentID: s.roverID, Processes: processIDList, }) return err } func (s *ProcessStorage) processesReport(waitReportProcesses []*ProcessContext) error { if len(waitReportProcesses) == 0 { return nil } properties := make([]*v3.EBPFProcessProperties, 0) buildContext := &base.BuildEBPFProcessContext{} buildContext.HostIP = tools.DefaultHostIPAddress() for _, ps := range waitReportProcesses { properties = append(properties, s.finders[ps.DetectType()].BuildEBPFProcess(buildContext, ps.detectProcess)) } processes, err := s.processClient.ReportProcesses(s.ctx, &v3.EBPFProcessReportList{Processes: properties, EbpfAgentID: s.roverID}) if err != nil { return err } eventBuilder := s.newProcessEventBuilder(ProcessOperateAdd) for _, waitProcess := range waitReportProcesses { found := false for _, reportedProcess := range processes.GetProcesses() { id := s.finders[waitProcess.DetectType()].ParseProcessID(waitProcess.detectProcess, reportedProcess) if id == "" { continue } s.updateProcessToUploadSuccess(waitProcess, id) found = true eventBuilder.AddProcess(waitProcess.Pid(), waitProcess) break } if !found { s.updateProcessToUploadIgnored(waitProcess) } } eventBuilder.Send() return nil } func (s *ProcessStorage) SyncAllProcessInFinder(finder api.ProcessDetectType, processes []base.DetectedProcess) { s.mutex.Lock() defer s.mutex.Unlock() newProcesses := make([]*ProcessContext, 0) existingProcesses := s.processes[finder] existingProcessHasFounded := make(map[*ProcessContext]bool) for _, p := range existingProcesses { existingProcessHasFounded[p] = false } for _, syncProcess := range processes { founded := false for _, existingProcess := range existingProcesses { if syncProcess.Pid() == existingProcess.Pid() && syncProcess.Entity().SameWith(existingProcess.Entity()) { newProcesses = append(newProcesses, existingProcess) existingProcessHasFounded[existingProcess] = true founded = true break } } // if not found in existing processes, need to add this process if !founded { newProcesses = append(newProcesses, s.constructNewProcessContext(finder, syncProcess)) log.Infof("detected new process: pid: %d, entity: %s", syncProcess.Pid(), syncProcess.Entity()) } } // log the dead processes eventBuilder := s.newProcessEventBuilder(ProcessOperateDelete) for p, found := range existingProcessHasFounded { if found { continue } log.Infof("the process has been recognized as dead, so deleted. pid: %d, entity: %s, id: %s", p.Pid(), p.Entity(), p.id) eventBuilder.AddProcess(p.Pid(), p) } s.processes[finder] = newProcesses eventBuilder.Send() } func (s *ProcessStorage) constructNewProcessContext(finder api.ProcessDetectType, process base.DetectedProcess) *ProcessContext { exporsedPorts := make(map[int]bool) for _, p := range process.ExposePorts() { exporsedPorts[p] = true } return &ProcessContext{ syncStatus: NotReport, detectProcess: process, detectType: finder, exposedPorts: exporsedPorts, } } func (s *ProcessStorage) updateProcessToUploadSuccess(pc *ProcessContext, id string) { reported := pc.id == id pc.id = id pc.syncStatus = ReportSuccess if !reported { log.Infof("uploaded process pid: %d, name: %s, id: %s", pc.detectProcess.Pid(), pc.detectProcess.Entity().ProcessName, id) } } func (s *ProcessStorage) updateProcessToUploadIgnored(pc *ProcessContext) { pc.syncStatus = Ignore log.Infof("could not found the process id from upstream, pid: %d, entity: %v", pc.Pid(), pc.Entity()) } func (s *ProcessStorage) GetAllProcesses() []api.ProcessInterface { result := make([]api.ProcessInterface, 0) for _, processList := range s.processes { for _, p := range processList { result = append(result, p) } } return result } func (s *ProcessStorage) FindAllRegisteredProcesses() []api.ProcessInterface { result := make([]api.ProcessInterface, 0) for _, processList := range s.processes { for _, p := range processList { if p.syncStatus == ReportSuccess { result = append(result, p) } } } return result } func (s *ProcessStorage) FindProcessByID(processID string) api.ProcessInterface { for _, finderProcesses := range s.processes { for _, p := range finderProcesses { if p.id == processID { return p } } } return nil } func (s *ProcessStorage) FindProcessByPID(pid int32) []api.ProcessInterface { result := make([]api.ProcessInterface, 0) for _, finderProcesses := range s.processes { for _, p := range finderProcesses { if p.Pid() == pid { result = append(result, p) } } } return result } func (s *ProcessStorage) AddListener(listener api.ProcessListener) { s.listeners = append(s.listeners, listener) s.initListenQueue <- listener } func (s *ProcessStorage) DeleteListener(listener api.ProcessListener) { result := make([]api.ProcessListener, 0) for _, l := range s.listeners { if l != listener { result = append(result, l) } } s.listeners = result } type ProcessOperate int const ( ProcessOperateAdd = 1 ProcessOperateDelete = 2 ) type processEventBuilder struct { processes map[int32][]api.ProcessInterface operate ProcessOperate storage *ProcessStorage } func (s *ProcessStorage) newProcessEventBuilder(operate ProcessOperate) *processEventBuilder { return &processEventBuilder{ processes: make(map[int32][]api.ProcessInterface), operate: operate, storage: s, } } func (p *processEventBuilder) AddProcess(pid int32, pi api.ProcessInterface) { ps := p.processes[pid] ps = append(ps, pi) p.processes[pid] = ps } func (p *processEventBuilder) Send() { for pid, processes := range p.processes { p.storage.eventQueue <- &processEvent{ pid: pid, processes: processes, operate: p.operate, } } } type processEvent struct { pid int32 processes []api.ProcessInterface operate ProcessOperate } func (s *ProcessStorage) consumeProcessEvent(listeners []api.ProcessListener, e *processEvent) { for _, l := range listeners { if e.operate == ProcessOperateAdd { l.AddNewProcess(e.pid, e.processes) } else { l.RemoveProcess(e.pid, e.processes) } } } func (s *ProcessStorage) notifyToRecheckAllProcesses(listeners []api.ProcessListener) { if len(listeners) == 0 { return } // build all processes events := s.newProcessEventBuilder(ProcessOperateAdd) for _, pcs := range s.processes { for _, pc := range pcs { events.AddProcess(pc.Pid(), pc) } } for _, l := range listeners { l.RecheckAllProcesses(events.processes) } }