telemetry/telemetrybuffer.go (342 lines of code) (raw):

// Copyright 2018 Microsoft. All rights reserved. // MIT License package telemetry import ( "bufio" "context" "encoding/json" "net" "os" "path/filepath" "strings" "sync" "time" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" "go.uber.org/zap" ) // TelemetryConfig - telemetry config read by telemetry service type TelemetryConfig struct { ReportToHostIntervalInSeconds time.Duration `json:"reportToHostIntervalInSeconds"` DisableAll bool DisableTrace bool DisableMetric bool DisableMetadataThread bool DebugMode bool DisableTelemetryToNetAgent bool RefreshTimeoutInSecs int BatchIntervalInSecs int BatchSizeInBytes int GetEnvRetryCount int GetEnvRetryWaitTimeInSecs int } // FdName - file descriptor name // Delimiter - delimiter for socket reads/writes // MaxPayloadSize - max buffer size in bytes const ( FdName = "azure-vnet-telemetry" Delimiter = '\n' MaxPayloadSize = 4096 MaxNumReports = 1000 ) // TelemetryBuffer object type TelemetryBuffer struct { client net.Conn listener net.Listener connections []net.Conn FdExists bool Connected bool data chan interface{} cancel chan bool mutex sync.Mutex logger *zap.Logger plc platform.ExecClient } // Buffer object holds the different types of reports type Buffer struct { CNIReports []CNIReport } // NewTelemetryBuffer - create a new TelemetryBuffer func NewTelemetryBuffer(logger *zap.Logger) *TelemetryBuffer { var tb TelemetryBuffer tb.data = make(chan interface{}, MaxNumReports) tb.cancel = make(chan bool, 1) tb.connections = make([]net.Conn, 0) tb.logger = logger tb.plc = platform.NewExecClient(tb.logger) return &tb } func remove(s []net.Conn, i int) []net.Conn { if len(s) > 0 && i < len(s) { s[i] = s[len(s)-1] return s[:len(s)-1] } log.Logf("tb connections remove failed index %v len %v", i, len(s)) return s } // Starts Telemetry server listening on unix domain socket func (tb *TelemetryBuffer) StartServer() error { err := tb.Listen(FdName) if err != nil { tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied") if tb.logger != nil { tb.logger.Error("Listen returns", zap.Error(err)) } else { log.Logf("Listen returns: %v", err.Error()) } return err } if tb.logger != nil { tb.logger.Info("Telemetry service started") } else { log.Logf("Telemetry service started") } // Spawn server goroutine to handle incoming connections go func() { for { // Spawn worker goroutines to communicate with client conn, err := tb.listener.Accept() if err == nil { tb.mutex.Lock() tb.connections = append(tb.connections, conn) tb.mutex.Unlock() go func() { defer func() { var index int var value net.Conn var found bool tb.mutex.Lock() defer tb.mutex.Unlock() for index, value = range tb.connections { if value == conn { conn.Close() found = true break } } if found { tb.connections = remove(tb.connections, index) } }() reader := bufio.NewReader(conn) for { reportStr, readErr := reader.ReadBytes(Delimiter) if readErr != nil { return } reportStr = reportStr[:len(reportStr)-1] var tmp map[string]interface{} err = json.Unmarshal(reportStr, &tmp) if err != nil { if tb.logger != nil { tb.logger.Error("StartServer: unmarshal error", zap.Error(err)) } else { log.Logf("StartServer: unmarshal error:%v", err) } return } if _, ok := tmp["CniSucceeded"]; ok { var cniReport CNIReport err = json.Unmarshal([]byte(reportStr), &cniReport) if err != nil { return } tb.data <- cniReport } else if _, ok := tmp["Metric"]; ok { var aiMetric AIMetric err = json.Unmarshal([]byte(reportStr), &aiMetric) if err != nil { return } tb.data <- aiMetric } else { if tb.logger != nil { tb.logger.Info("StartServer: default", zap.Any("case", tmp)) } else { log.Logf("StartServer: default case:%+v...", tmp) } } } }() } else { if tb.logger != nil { tb.logger.Error("Telemetry Server accept error", zap.Error(err)) } else { log.Logf("Telemetry Server accept error %v", err) } return } } }() return nil } func (tb *TelemetryBuffer) Connect() error { err := tb.Dial(FdName) if err == nil { tb.Connected = true } else if tb.FdExists { tb.Cleanup(FdName) } return err } // PushData - PushData running an instance if it isn't already being run elsewhere func (tb *TelemetryBuffer) PushData(ctx context.Context) { defer tb.Close() for { select { case report := <-tb.data: tb.mutex.Lock() push(report) tb.mutex.Unlock() case <-tb.cancel: if tb.logger != nil { tb.logger.Info("server cancel event") } else { log.Logf("[Telemetry] server cancel event") } return case <-ctx.Done(): if tb.logger != nil { tb.logger.Info("received context done event") } else { log.Logf("[Telemetry] received context done event") } return } } } // Write - write to the file descriptor. func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) { buf := make([]byte, len(b)) copy(buf, b) //nolint:makezero //keeping old code buf = append(buf, Delimiter) w := bufio.NewWriter(tb.client) c, err = w.Write(buf) if err == nil { err = w.Flush() } return } // Cancel - signal to tear down telemetry buffer func (tb *TelemetryBuffer) Cancel() { tb.cancel <- true } // Close - close all connections func (tb *TelemetryBuffer) Close() { if tb.client != nil { tb.client.Close() tb.client = nil } if tb.listener != nil { if tb.logger != nil { tb.logger.Info("server close") } else { log.Logf("server close") } tb.listener.Close() } tb.mutex.Lock() defer tb.mutex.Unlock() for _, conn := range tb.connections { if conn != nil { conn.Close() } } tb.connections = nil tb.connections = make([]net.Conn, 0) } // push - push the report (x) to corresponding slice func push(x interface{}) { switch y := x.(type) { case CNIReport: SendAITelemetry(y) case AIMetric: SendAIMetric(y) default: log.Printf("Push fn: Default case:%+v", y) } } // WaitForTelemetrySocket - Block still pipe/sock created or until max attempts retried func WaitForTelemetrySocket(maxAttempt int, waitTimeInMillisecs time.Duration) { for attempt := 0; attempt < maxAttempt; attempt++ { if SockExists() { break } time.Sleep(waitTimeInMillisecs * time.Millisecond) } } // StartTelemetryService - Kills if any telemetry service runs and start new telemetry service func (tb *TelemetryBuffer) StartTelemetryService(path string, args []string) error { err := tb.plc.KillProcessByName(TelemetryServiceProcessName) if err != nil { if tb.logger != nil { tb.logger.Error("Failed to kill process by", zap.String("TelemetryServiceProcessName", TelemetryServiceProcessName), zap.Error(err)) } else { log.Logf("[Telemetry] Failed to kill process by telemetryServiceProcessName %s due to %v", TelemetryServiceProcessName, err) } } if tb.logger != nil { tb.logger.Info("Starting telemetry service process", zap.String("path", path), zap.Any("args", args)) } else { log.Logf("[Telemetry] Starting telemetry service process :%v args:%v", path, args) } if err := common.StartProcess(path, args); err != nil { if tb.logger != nil { tb.logger.Error("Failed to start telemetry service process", zap.Error(err)) } else { log.Logf("[Telemetry] Failed to start telemetry service process :%v", err) } return err } if tb.logger != nil { tb.logger.Info("Telemetry service started") } else { log.Logf("[Telemetry] Telemetry service started") } return nil } // ReadConfigFile - Read telemetry config file and populate to structure func ReadConfigFile(filePath string) (TelemetryConfig, error) { config := TelemetryConfig{} b, err := os.ReadFile(filePath) if err != nil { return config, err } if err = json.Unmarshal(b, &config); err != nil { return config, err // nolint } return config, err } // ConnectToTelemetryService - Attempt to spawn telemetry process if it's not already running. func (tb *TelemetryBuffer) ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds int) { path, dir := getTelemetryServiceDirectory() args := []string{"-d", dir} for attempt := 0; attempt < 2; attempt++ { if err := tb.Connect(); err != nil { if tb.logger != nil { tb.logger.Error("Connection to telemetry socket failed", zap.Error(err)) } else { log.Logf("Connection to telemetry socket failed: %v", err) } if _, exists := os.Stat(path); exists != nil { if tb.logger != nil { tb.logger.Info("Skip starting telemetry service as file didn't exist") } else { log.Logf("Skip starting telemetry service as file didn't exist") } return } tb.Cleanup(FdName) tb.StartTelemetryService(path, args) // nolint WaitForTelemetrySocket(telemetryNumRetries, time.Duration(telemetryWaitTimeInMilliseconds)) } else { tb.Connected = true if tb.logger != nil { tb.logger.Info("Connected to telemetry service") } else { log.Logf("Connected to telemetry service") } return } } } // ConnectToTelemetry - attempt to connect to telemetry service func (tb *TelemetryBuffer) ConnectToTelemetry() { if err := tb.Connect(); err != nil { log.Logf("Connection to telemetry socket failed: %v", err) return } tb.Connected = true log.Logf("Connected to telemetry service") } // getTelemetryServiceDirectory - check CNI install directory and Executable location for telemetry binary func getTelemetryServiceDirectory() (path string, dir string) { path = filepath.Join(CniInstallDir, TelemetryServiceProcessName) if _, exists := os.Stat(path); exists != nil { ex, _ := os.Executable() exDir := filepath.Dir(ex) path = filepath.Join(exDir, TelemetryServiceProcessName) dir = exDir } else { dir = CniInstallDir } return }