pkg/accesslog/runner.go (211 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package accesslog import ( "context" "fmt" "strings" "time" process2 "github.com/shirou/gopsutil/process" "github.com/sirupsen/logrus" "github.com/apache/skywalking-rover/pkg/accesslog/bpf" "github.com/apache/skywalking-rover/pkg/accesslog/collector" "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/accesslog/events" "github.com/apache/skywalking-rover/pkg/accesslog/forwarder" "github.com/apache/skywalking-rover/pkg/accesslog/sender" "github.com/apache/skywalking-rover/pkg/core" "github.com/apache/skywalking-rover/pkg/core/backend" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) const kernelAccessLogCacheTime = time.Second * 10 var log = logger.GetLogger("access_log", "runner") type Runner struct { context *common.AccessLogContext collectors []collector.Collector mgr *module.Manager backendOp backend.Operator cluster string ctx context.Context sender *sender.GRPCSender } func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) { bpfLoader, err := bpf.NewLoader() if err != nil { return nil, err } flushDuration, err := time.ParseDuration(config.Flush.Period) if err != nil { return nil, fmt.Errorf("parse flush period error: %v", err) } coreModule := mgr.FindModule(core.ModuleName).(core.Operator) backendOP := coreModule.BackendOperator() clusterName := coreModule.ClusterName() monitorFilter := common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","), strings.Split(config.ExcludeClusters, ",")) connectionMgr := common.NewConnectionManager(config, mgr, bpfLoader, monitorFilter) runner := &Runner{ context: &common.AccessLogContext{ BPF: bpfLoader, Config: config, ConnectionMgr: connectionMgr, }, collectors: collector.Collectors(), mgr: mgr, backendOp: backendOP, cluster: clusterName, sender: sender.NewGRPCSender(mgr, connectionMgr), } runner.context.Queue = common.NewQueue(config.Flush.MaxCountOneStream, flushDuration, runner) return runner, nil } func (r *Runner) Start(ctx context.Context) error { r.ctx = ctx r.context.RuntimeContext = ctx r.context.Queue.Start(ctx) r.context.ConnectionMgr.Start(ctx, r.context) r.sender.Start(ctx) for _, c := range r.collectors { err := c.Start(r.mgr, r.context) if err != nil { return err } } if err := r.context.BPF.HasError(); err != nil { return err } return nil } func (r *Runner) Consume(kernels chan common.KernelLog, protocols chan common.ProtocolLog) { if r.backendOp.GetConnectionStatus() != backend.Connected { log.Warnf("failure to connect to the backend, skip generating access log") return } batch := r.sender.NewBatch() r.buildConnectionLogs(batch, kernels, protocols) log.Debugf("ready to send access log, connection count: %d", batch.ConnectionCount()) r.sender.AddBatch(batch) } func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan common.KernelLog, protocols chan common.ProtocolLog) { r.buildKernelLogs(kernels, batch) r.buildProtocolLogs(protocols, batch) r.context.ConnectionMgr.OnBuildConnectionLogFinished() } func (r *Runner) buildKernelLogs(kernels chan common.KernelLog, batch *sender.BatchLogs) { delayAppends := make([]common.KernelLog, 0) for { select { case kernelLog := <-kernels: connection, curLog, delay := r.buildKernelLog(kernelLog) log.Debugf("building kernel log result, connetaion ID: %d, random ID: %d, exist connection: %t, delay: %t", kernelLog.Event().GetConnectionID(), kernelLog.Event().GetRandomID(), connection != nil, delay) if connection != nil && curLog != nil { batch.AppendKernelLog(connection, curLog) } else if delay { delayAppends = append(delayAppends, kernelLog) } default: for _, delayAppend := range delayAppends { select { case kernels <- delayAppend: default: return } } return } } } func (r *Runner) buildProtocolLogs(protocols chan common.ProtocolLog, batch *sender.BatchLogs) { delayAppends := make([]common.ProtocolLog, 0) for { select { case protocolLog := <-protocols: connection, kernelLogs, protocolLogs, delay := r.buildProtocolLog(protocolLog) if log.Enable(logrus.DebugLevel) { kernelLogCount := len(protocolLog.RelateKernelLogs()) var conID, randomID uint64 if kernelLogCount > 0 { conID, randomID = protocolLog.RelateKernelLogs()[0].GetConnectionID(), protocolLog.RelateKernelLogs()[0].GetRandomID() } log.Debugf("building protocol log result, connetaion ID: %d, random ID: %d, connection exist: %t, delay: %t", conID, randomID, connection != nil, delay) } if connection != nil && len(kernelLogs) > 0 && protocolLogs != nil { batch.AppendProtocolLog(connection, kernelLogs, protocolLogs) } else if delay { delayAppends = append(delayAppends, protocolLog) } default: for _, delayAppend := range delayAppends { select { case protocols <- delayAppend: default: return } } return } } } func (r *Runner) shouldReportProcessLog(pid uint32) bool { // if the process not monitoring, then check the process is existed or not if r.context.ConnectionMgr.ProcessIsMonitor(pid) { return true } exists, err := process2.PidExists(int32(pid)) if err != nil { log.Warnf("check pid exists error, pid: %d, error: %v", pid, err) return false } if exists { return false } log.Debugf("the log should be also uploaded because the process quick shutdown but the log exist, pid: %d", pid) return true } func (r *Runner) buildProtocolLog(protocolLog common.ProtocolLog) (*common.ConnectionInfo, []*v3.AccessLogKernelLog, *v3.AccessLogProtocolLogs, bool) { if len(protocolLog.RelateKernelLogs()) == 0 { return nil, nil, nil, false } firstKernelLog := protocolLog.RelateKernelLogs()[0] pid, _ := events.ParseConnectionID(firstKernelLog.GetConnectionID()) // if the process not monitoring, then ignore it if !r.shouldReportProcessLog(pid) { return nil, nil, nil, false } connection := r.context.ConnectionMgr.Find(firstKernelLog) if connection == nil { // if the connection cannot be found, it means that the connection have not been established // just re-add into the queue for checking in the next period if time.Since(firstKernelLog.Timestamp()) > kernelAccessLogCacheTime { return nil, nil, nil, false } return nil, nil, nil, true } kernelLogs := make([]*v3.AccessLogKernelLog, 0) for _, kl := range protocolLog.RelateKernelLogs() { event := forwarder.BuildKernelLogFromEvent(common.LogTypeKernelTransfer, kl) if event == nil { continue } kernelLogs = append(kernelLogs, event) } return connection, kernelLogs, protocolLog.ProtocolLog(), false } func (r *Runner) buildKernelLog(kernelLog common.KernelLog) (*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) { pid, _ := events.ParseConnectionID(kernelLog.Event().GetConnectionID()) // if the process not monitoring, then ignore it if !r.shouldReportProcessLog(pid) { return nil, nil, false } connection := r.context.ConnectionMgr.Find(kernelLog.Event()) if connection == nil { // if the connection cannot be found, it means that the connection have not been established // just re-add into the queue for checking in the next period if time.Since(kernelLog.Event().Timestamp()) > kernelAccessLogCacheTime { return nil, nil, false } return nil, nil, true } event := forwarder.BuildKernelLogFromEvent(kernelLog.Type(), kernelLog.Event()) return connection, event, false } func (r *Runner) Stop() error { r.context.ConnectionMgr.Stop() return nil }