pkg/process/finders/manager.go (116 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package finders
import (
"context"
"fmt"
"time"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/hashicorp/go-multierror"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/process/finders/base"
)
var log = logger.GetLogger("process", "finder")
// when there have process listener been registered, how often to confrim(recheck) the processes
var processListenerRecheckInterval = time.Minute
// ProcessManager means Manage all Process
type ProcessManager struct {
moduleManager *module.Manager
// finders
finders map[base.FinderBaseConfig]base.ProcessFinder
// process storage
storage *ProcessStorage
}
type ProcessManagerWithFinder struct {
*ProcessManager
finderType api.ProcessDetectType
}
func NewProcessManager(ctx context.Context, moduleManager *module.Manager,
reportInterval time.Duration, propertiesReportFactor int, configs ...base.FinderBaseConfig) (*ProcessManager, error) {
// locate all finders
confinedFinders := make(map[base.FinderBaseConfig]base.ProcessFinder)
fsList := make([]base.ProcessFinder, 0)
for _, conf := range configs {
if conf == nil || !conf.ActiveFinder() {
continue
}
finder := getFinder(conf)
confinedFinders[conf] = finder
fsList = append(fsList, finder)
}
if len(confinedFinders) == 0 {
return nil, fmt.Errorf("no process finder found")
}
// start new storage
storage, err := NewProcessStorage(ctx, moduleManager, reportInterval, propertiesReportFactor, fsList, processListenerRecheckInterval)
if err != nil {
return nil, err
}
// init all finders
manager := &ProcessManager{
finders: confinedFinders,
moduleManager: moduleManager,
storage: storage,
}
for conf, finder := range confinedFinders {
processManager := &ProcessManagerWithFinder{ProcessManager: manager, finderType: finder.DetectType()}
if err := finder.Init(ctx, conf, processManager); err != nil {
return nil, fmt.Errorf("starting %s finder failure: %v", finder.DetectType().Name(), err)
}
}
return manager, nil
}
func (m *ProcessManager) Start() {
// start all finders
for _, finder := range m.finders {
finder.Start()
}
// start storage report with interval
m.storage.StartReport()
}
func (m *ProcessManager) Shutdown() error {
var result error
// stop reporter
if err := m.storage.StopReport(); err != nil {
result = multierror.Append(result, err)
}
// stop finders
for _, finder := range m.finders {
if err := finder.Stop(); err != nil {
result = multierror.Append(result, err)
}
}
return result
}
func (m *ProcessManager) Finder(finderType api.ProcessDetectType) (base.ProcessFinder, bool) {
for _, finder := range m.finders {
if finder.DetectType() == finderType {
return finder, true
}
}
return nil, false
}
func (p *ProcessManagerWithFinder) GetModuleManager() *module.Manager {
return p.moduleManager
}
func (p *ProcessManagerWithFinder) SyncAllProcessInFinder(processes []api.DetectedProcess) {
p.storage.SyncAllProcessInFinder(p.finderType, processes)
}
func (p *ProcessManagerWithFinder) AddDetectedProcess(processes []api.DetectedProcess) {
p.storage.AddNewProcessInFinder(p.finderType, processes)
}
func (m *ProcessManager) GetAllProcesses() []api.ProcessInterface {
return m.storage.GetAllProcesses()
}
func (m *ProcessManager) FindProcessByID(processID string) api.ProcessInterface {
return m.storage.FindProcessByID(processID)
}
func (m *ProcessManager) FindProcessByPID(pid int32) []api.ProcessInterface {
return m.storage.FindProcessByPID(pid)
}
func (m *ProcessManager) FindAllRegisteredProcesses() []api.ProcessInterface {
return m.storage.FindAllRegisteredProcesses()
}
func (m *ProcessManager) AddListener(listener api.ProcessListener) {
m.storage.AddListener(listener)
}
func (m *ProcessManager) DeleteListener(listener api.ProcessListener) {
m.storage.DeleteListener(listener)
}
func (m *ProcessManager) ShouldMonitor(pid int32) bool {
monitor := false
for _, finder := range m.finders {
if finder.ShouldMonitor(pid) {
monitor = true
}
}
return monitor
}