plugins/input/telegraf/telegraf_manager.go (298 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package telegraf import ( "context" "fmt" "os" "os/exec" "path" "strings" "sync" "time" "github.com/alibaba/ilogtail/pkg" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/util" ) var statusCheckInterval = time.Second * time.Duration(30) type Config struct { Name string Detail string } // Telegraf supervisor for agent start, stop, config reload... // // Because Telegraf will send all inputs' data to all outputs, so only ONE Logtail // config will be passed to Telegraf simultaneously. // // Data link: Telegraf ------ HTTP ------> Logtail ----- Protobuf ------> SLS. // Logtail will work as an InfluxDB server to receive data from telegraf by HTTP protocol. type Manager struct { // Although only one config will be loaded, all configs should be saved for replacement // while current config is unregistered. configs map[string]*Config mu sync.Mutex loadedConfigs map[string]*Config ch chan struct{} telegrafPath string telegrafdPath string telegrafConfPath string collector *LogCollector bindMeta *helper.ManagerMeta } func (tm *Manager) RegisterConfig(ctx pipeline.Context, c *Config) { tm.mu.Lock() tm.configs[c.Name] = c ltCtx, ok := ctx.GetRuntimeContext().Value(pkg.LogTailMeta).(*pkg.LogtailContextMeta) if ok { tm.bindMeta.Add(ltCtx.GetProject(), ltCtx.GetLogStore(), ltCtx.GetConfigName()) } tm.mu.Unlock() logger.Debugf(telegrafManager.GetContext(), "register config: %v", c) tm.notify() } func (tm *Manager) UnregisterConfig(ctx pipeline.Context, c *Config) { tm.mu.Lock() delete(tm.configs, c.Name) ltCtx, ok := ctx.GetRuntimeContext().Value(pkg.LogTailMeta).(*pkg.LogtailContextMeta) if ok { tm.bindMeta.Delete(ltCtx.GetProject(), ltCtx.GetLogStore(), ltCtx.GetConfigName()) } tm.mu.Unlock() logger.Debugf(telegrafManager.GetContext(), "unregister config: %v", c) tm.notify() } func (tm *Manager) notify() { select { case tm.ch <- struct{}{}: default: } } func isPathExist(p string) (bool, error) { _, err := os.Stat(p) switch { case err == nil: return true, nil case os.IsNotExist(err): return false, nil default: return false, err } } // shouldCreatePath returns true if p is not existing and no error when stat. func shouldCreatePath(p string) bool { ret, err := isPathExist(p) if err == nil { return !ret } logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "stat path %v err: %v", p, err) return false } // makeSureDirectoryExist returns true if the directory is created just now. func makeSureDirectoryExist(p string) (bool, error) { if !shouldCreatePath(p) { return false, nil } return true, os.MkdirAll(p, 0750) } const defaultConfFileName = "telegraf.conf" const defaultConfig = ` # DO NOT MODIFY: It will be overwrited when Logtail starts. [agent] debug = %t logfile = "telegraf.log" logfile_rotation_max_size = 1024000 logfile_rotation_max_archives = 2 ` func (tm *Manager) initAgentDir() { if newDir, err := makeSureDirectoryExist(tm.telegrafConfPath); newDir { if err != nil { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "create conf dir error, path %v, err: %v", tm.telegrafConfPath, err) } } else { // Clean config files (outdated) in conf directory. if files, err := os.ReadDir(tm.telegrafConfPath); err == nil { for _, f := range files { filePath := path.Join(tm.telegrafConfPath, f.Name()) if err = os.Remove(filePath); err == nil { logger.Infof(telegrafManager.GetContext(), "delete outdated agent config file: %v", filePath) } else { logger.Warningf(telegrafManager.GetContext(), "deleted outdated agent config file err, path: %v, err: %v", filePath, err) } } } else { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "clean conf dir error, path %v, err: %v", tm.telegrafConfPath, err) } } defaultConfigPath := path.Join(tm.telegrafPath, defaultConfFileName) if err := os.WriteFile(defaultConfigPath, []byte(fmt.Sprintf(defaultConfig, logger.DebugFlag())), 0600); err != nil { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "write default config error, path: %v, err: %v", defaultConfigPath, err) } } func (tm *Manager) run() { tm.initAgentDir() for { select { case <-time.After(statusCheckInterval): case <-tm.ch: } logger.Debugf(telegrafManager.GetContext(), "start to check") tm.check() logger.Debugf(telegrafManager.GetContext(), "check done") } } func (tm *Manager) getLatestConfigs() map[string]*Config { tm.mu.Lock() defer tm.mu.Unlock() if len(tm.configs) == 0 { return nil } toLoadConfigs := make(map[string]*Config) for name, cfg := range tm.configs { toLoadConfigs[name] = cfg } return toLoadConfigs } func (tm *Manager) check() { configs := tm.getLatestConfigs() logger.Debugf(telegrafManager.GetContext(), "latest configs: %v", configs) // Clear all loaded config files and stop telegraf. if configs == nil { if len(tm.loadedConfigs) > 0 { for name := range tm.loadedConfigs { tm.removeConfigFile(name) } logger.Infof(telegrafManager.GetContext(), "clear all configs and stop agent, count: %v", len(tm.loadedConfigs)) tm.loadedConfigs = make(map[string]*Config) } tm.stop() tm.collector.TelegrafStop() return } if !tm.install() { return } // Still have configs, do comparison. toRemoveConfigs := make([]string, 0) toAddOrUpdateConfigs := make([]*Config, 0) for name, curCfg := range tm.loadedConfigs { if cfg, exists := configs[name]; exists { if curCfg.Detail != cfg.Detail { toAddOrUpdateConfigs = append(toAddOrUpdateConfigs, cfg) } } else { toRemoveConfigs = append(toRemoveConfigs, name) } } for name, cfg := range configs { if _, exists := tm.loadedConfigs[name]; !exists { toAddOrUpdateConfigs = append(toAddOrUpdateConfigs, cfg) } } for _, name := range toRemoveConfigs { tm.removeConfigFile(name) delete(tm.loadedConfigs, name) } for _, cfg := range toAddOrUpdateConfigs { if tm.overwriteConfigFile(cfg) { tm.loadedConfigs[cfg.Name] = cfg } } if len(toRemoveConfigs) == 0 && len(toAddOrUpdateConfigs) == 0 { tm.start() } else { tm.reload() } tm.collector.TelegrafStart() } func (tm *Manager) concatConfFilePath(name string) string { return path.Join(tm.telegrafConfPath, fmt.Sprintf("%v.conf", name)) } func (tm *Manager) overwriteConfigFile(cfg *Config) bool { filePath := tm.concatConfFilePath(cfg.Name) if _, err := makeSureDirectoryExist(tm.telegrafConfPath); err != nil { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_OVERWRITE_CONFIG_ALARM", "overwrite local config file error, path: %v err: %v", filePath, err) return false } if err := os.WriteFile(filePath, []byte(cfg.Detail), 0600); err != nil { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_OVERWRITE_CONFIG_ALARM", "overwrite local config file error, path: %v err: %v", filePath, err) return false } logger.Infof(telegrafManager.GetContext(), "overwrite agent config %v", cfg.Name) return true } func (tm *Manager) removeConfigFile(name string) { filePath := path.Join(tm.telegrafConfPath, fmt.Sprintf("%v.conf", name)) if err := os.Remove(filePath); err != nil { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_REMOVE_CONFIG_ALARM", "remove local config file error, path: %v, err: %v", filePath, err) return } logger.Infof(telegrafManager.GetContext(), "remove agent config %v", name) } func (tm *Manager) runTelegrafd(command string, needOutput bool) (output []byte, err error) { cmd := exec.Command(tm.telegrafdPath, command) if needOutput { output, err = cmd.CombinedOutput() } else { // Must call start/reload without output, because they might fork sub process, // which will hang when CombinedOutput is called. err = cmd.Run() } // Workaround: exec.Command throws wait:no child process error always under c-shared buildmode. // TODO: try cgo, implement exec with C and popen. if err != nil && !strings.Contains(err.Error(), "no child process") { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "%v error, output: %v, error: %v", command, string(output), err) } return } // install returns true if agent has been installed. func (tm *Manager) install() bool { if exist, err := isPathExist(tm.telegrafdPath); err != nil { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "stat path %v err when install: %v", tm.telegrafdPath, err) return false } else if exist { return true } scriptPath := path.Join(tm.telegrafPath, "install.sh") if exist, err := isPathExist(scriptPath); err != nil || !exist { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "can not find install script %v, maybe stat error: %v", scriptPath, err) return false } // Install by execute install.sh cmd := exec.Command(scriptPath) output, err := cmd.CombinedOutput() if err != nil && !strings.Contains(err.Error(), "no child process") { logger.Warningf(telegrafManager.GetContext(), "SERVICE_TELEGRAF_RUNTIME_ALARM", "install agent error, output: %v, error: %v", string(output), err) return false } tm.initAgentDir() logger.Infof(telegrafManager.GetContext(), "install agent done, output: %v", string(output)) return true } func (tm *Manager) start() { _, _ = tm.runTelegrafd("start", false) } func (tm *Manager) stop() { if exist, _ := isPathExist(tm.telegrafdPath); !exist { return } _, _ = tm.runTelegrafd("stop", false) } func (tm *Manager) reload() { _, _ = tm.runTelegrafd("reload", false) logger.Infof(telegrafManager.GetContext(), "agent config reloaded") } func (tm *Manager) GetContext() context.Context { return tm.bindMeta.GetContext() } var telegrafManager *Manager var once sync.Once func GetTelegrafManager(agentDirPath string) *Manager { once.Do(func() { telegrafManager = &Manager{ configs: make(map[string]*Config), loadedConfigs: make(map[string]*Config), ch: make(chan struct{}, 1), telegrafPath: agentDirPath, bindMeta: helper.NewmanagerMeta("telegraf"), telegrafdPath: path.Join(agentDirPath, "telegrafd"), telegrafConfPath: path.Join(agentDirPath, "conf.d"), } // don't init the collector with struct because the collector depends on the bindMeta. telegrafManager.collector = NewLogCollector(agentDirPath) util.RegisterAlarm("telegraf", telegrafManager.bindMeta.GetAlarm()) go telegrafManager.run() go telegrafManager.collector.Run() }) return telegrafManager }