plugin_main/plugin_export.go (265 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "encoding/json" "fmt" "runtime" "runtime/debug" "sync" "time" "unsafe" "github.com/alibaba/ilogtail/pkg/config" "github.com/alibaba/ilogtail/pkg/flags" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/helper/k8smeta" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/util" "github.com/alibaba/ilogtail/pluginmanager" ) /* #include <stdlib.h> static char**makeCharArray(int size) { return malloc(sizeof(char*) * size); } static void setArrayString(char **a, char *s, int n) { a[n] = s; } struct containerMeta{ char* podName; char* k8sNamespace; char* containerName; char* image; int k8sLabelsSize; int containerLabelsSize; int envSize; char** k8sLabelsKey; char** k8sLabelsVal; char** containerLabelsKey; char** containerLabelsVal; char** envsKey; char** envsVal; }; typedef struct { char* key; char* value; } KeyValue; typedef struct { KeyValue** keyValues; int count; } PluginMetric; typedef struct { PluginMetric** metrics; int count; } PluginMetrics; static KeyValue** makeKeyValueArray(int size) { return malloc(sizeof(KeyValue*) * size); } static void setArrayKeyValue(KeyValue **a, KeyValue *s, int n) { a[n] = s; } static PluginMetric** makePluginMetricArray(int size) { return malloc(sizeof(KeyValue*) * size); } static void setArrayPluginMetric(PluginMetric **a, PluginMetric *s, int n) { a[n] = s; } */ import "C" //nolint:typecheck var initOnce sync.Once var loadOnce sync.Once //export InitPluginBase func InitPluginBase() int { return initPluginBase("") } //export InitPluginBaseV2 func InitPluginBaseV2(cfgStr string) int { return initPluginBase(cfgStr) } //export LoadGlobalConfig func LoadGlobalConfig(jsonStr string) int { // Only the first call will return non-zero. retcode := 0 loadOnce.Do(func() { if len(jsonStr) >= 2 { // For invalid JSON, use default value and return 0 if err := json.Unmarshal([]byte(jsonStr), &config.LoongcollectorGlobalConfig); err != nil { fmt.Println("load global config error", "GlobalConfig", jsonStr, "err", err) retcode = 1 } logger.InitLogger() for _, log := range flags.LogsWaitToPrint { switch log.LogType { case flags.LogTypeError: logger.Error(context.Background(), log.Content) case flags.LogTypeInfo: logger.Info(context.Background(), log.Content) case flags.LogTypeDebug: logger.Debug(context.Background(), log.Content) case flags.LogTypeWarning: logger.Warning(context.Background(), log.Content) } } logger.Info(context.Background(), "load global config", jsonStr) config.UserAgent = fmt.Sprintf("ilogtail/%v (%v) ip/%v", config.BaseVersion, runtime.GOOS, config.LoongcollectorGlobalConfig.HostIP) } }) if retcode == 0 { // Update when both of them are not empty. logger.Debugf(context.Background(), "host IP: %v, hostname: %v", config.LoongcollectorGlobalConfig.HostIP, config.LoongcollectorGlobalConfig.Hostname) if len(config.LoongcollectorGlobalConfig.Hostname) > 0 && len(config.LoongcollectorGlobalConfig.HostIP) > 0 { util.SetNetworkIdentification(config.LoongcollectorGlobalConfig.HostIP, config.LoongcollectorGlobalConfig.Hostname) } } return retcode } //export LoadPipeline func LoadPipeline(project string, logstore string, configName string, logstoreKey int64, jsonStr string) int { logger.Debug(context.Background(), "load config", configName, logstoreKey, "\n"+jsonStr) defer func() { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) logger.Error(context.Background(), "PLUGIN_RUNTIME_ALARM", "panicked", err, "stack", string(trace)) } }() err := pluginmanager.LoadLogstoreConfig(util.StringDeepCopy(project), util.StringDeepCopy(logstore), util.StringDeepCopy(configName), // Make deep copy if you want to save it in Go in the future. logstoreKey, jsonStr) if err != nil { logger.Error(context.Background(), "CONFIG_LOAD_ALARM", "load config error, project", project, "logstore", logstore, "config", configName, "error", err) return 1 } return 0 } //export UnloadPipeline func UnloadPipeline(configName string) int { logger.Debug(context.Background(), "unload config", configName) err := pluginmanager.UnloadPartiallyLoadedConfig(util.StringDeepCopy(configName)) if err != nil { return 1 } return 0 } //export ProcessLog func ProcessLog(configName string, logBytes []byte, packID string, topic string, tags []byte) int { pluginmanager.LogtailConfigLock.RLock() config, flag := pluginmanager.LogtailConfig[configName] if !flag { return -1 } pluginmanager.LogtailConfigLock.RUnlock() return config.ProcessLog(logBytes, util.StringDeepCopy(packID), util.StringDeepCopy(topic), tags) } //export ProcessLogGroup func ProcessLogGroup(configName string, logBytes []byte, packID string) int { pluginmanager.LogtailConfigLock.RLock() config, flag := pluginmanager.LogtailConfig[configName] pluginmanager.LogtailConfigLock.RUnlock() if !flag { logger.Error(context.Background(), "PLUGIN_ALARM", "config not found", configName) return -1 } return config.ProcessLogGroup(logBytes, util.StringDeepCopy(packID)) } //export StopAllPipelines func StopAllPipelines(withInputFlag int) { logger.Info(context.Background(), "Stop all", "start", "with input", withInputFlag) err := pluginmanager.StopAllPipelines(withInputFlag != 0) if err != nil { logger.Error(context.Background(), "PLUGIN_ALARM", "stop all error", err) } logger.Info(context.Background(), "Stop all", "success", "with input", withInputFlag) // Stop with input first, without input last. if withInputFlag == 0 { logger.Info(context.Background(), "logger", "close and recover") logger.Flush() logger.Close() } } //export Stop func Stop(configName string, removedFlag int) { logger.Info(context.Background(), "Stop", "start", "config", configName, "removed", removedFlag) err := pluginmanager.Stop(configName, removedFlag != 0) if err != nil { logger.Error(context.Background(), "PLUGIN_ALARM", "stop error", err) } } //export StopBuiltInModules func StopBuiltInModules() { pluginmanager.StopBuiltInModulesConfig() } //export Start func Start(configName string) { logger.Info(context.Background(), "Start", "start", "config", configName) err := pluginmanager.Start(configName) if err != nil { logger.Error(context.Background(), "PLUGIN_ALARM", "start error", err) } logger.Info(context.Background(), "Start", "success", "config", configName) } //export CtlCmd func CtlCmd(configName string, cmdID int, cmdDetail string) { logger.Info(context.Background(), "execute cmd", cmdID, "detail", cmdDetail, "config", configName) } //export GetContainerMeta func GetContainerMeta(containerID string) *C.struct_containerMeta { logger.InitLogger() meta := helper.GetContainerMeta(containerID) if meta == nil { logger.Debug(context.Background(), "get meta", "") return nil } if logger.DebugFlag() { bytes, _ := json.Marshal(meta) logger.Debug(context.Background(), "get meta", string(bytes)) } returnStruct := (*C.struct_containerMeta)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_containerMeta{})))) returnStruct.podName = C.CString(meta.PodName) returnStruct.k8sNamespace = C.CString(meta.K8sNamespace) returnStruct.containerName = C.CString(meta.ContainerName) returnStruct.image = C.CString(meta.Image) returnStruct.k8sLabelsSize = C.int(len(meta.K8sLabels)) if len(meta.K8sLabels) > 0 { returnStruct.k8sLabelsKey = C.makeCharArray(returnStruct.k8sLabelsSize) returnStruct.k8sLabelsVal = C.makeCharArray(returnStruct.k8sLabelsSize) count := 0 for k, v := range meta.K8sLabels { C.setArrayString(returnStruct.k8sLabelsKey, C.CString(k), C.int(count)) C.setArrayString(returnStruct.k8sLabelsVal, C.CString(v), C.int(count)) count++ } } returnStruct.containerLabelsSize = C.int(len(meta.ContainerLabels)) if len(meta.ContainerLabels) > 0 { returnStruct.containerLabelsKey = C.makeCharArray(returnStruct.containerLabelsSize) returnStruct.containerLabelsVal = C.makeCharArray(returnStruct.containerLabelsSize) count := 0 for k, v := range meta.ContainerLabels { C.setArrayString(returnStruct.containerLabelsKey, C.CString(k), C.int(count)) C.setArrayString(returnStruct.containerLabelsVal, C.CString(v), C.int(count)) count++ } } returnStruct.envSize = C.int(len(meta.Env)) if len(meta.Env) > 0 { returnStruct.envsKey = C.makeCharArray(returnStruct.envSize) returnStruct.envsVal = C.makeCharArray(returnStruct.envSize) count := 0 for k, v := range meta.Env { C.setArrayString(returnStruct.envsKey, C.CString(k), C.int(count)) C.setArrayString(returnStruct.envsVal, C.CString(v), C.int(count)) count++ } } return returnStruct } //export GetGoMetrics func GetGoMetrics(metricType string) *C.PluginMetrics { results := pluginmanager.GetMetrics(metricType) // 统计所有键值对的总数,用于分配内存 numMetrics := len(results) cPluginMetrics := (*C.PluginMetrics)(C.malloc(C.sizeof_PluginMetrics)) cPluginMetrics.count = C.int(numMetrics) cPluginMetrics.metrics = C.makePluginMetricArray(cPluginMetrics.count) // 填充 PluginMetrics 中的 keyValues for i, metric := range results { metricLen := len(metric) cMetric := (*C.PluginMetric)(C.malloc(C.sizeof_PluginMetric)) cMetric.count = C.int(metricLen) cMetric.keyValues = C.makeKeyValueArray(cMetric.count) j := 0 for k, v := range metric { cKey := C.CString(k) cValue := C.CString(v) cKeyValue := (*C.KeyValue)(C.malloc(C.sizeof_KeyValue)) cKeyValue.key = cKey cKeyValue.value = cValue C.setArrayKeyValue(cMetric.keyValues, cKeyValue, C.int(j)) j++ } C.setArrayPluginMetric(cPluginMetrics.metrics, cMetric, C.int(i)) } return cPluginMetrics } func initPluginBase(cfgStr string) int { // Only the first call will return non-zero. rst := 0 initOnce.Do(func() { LoadGlobalConfig(cfgStr) InitHTTPServer() setGCPercentForSlowStart() logger.Info(context.Background(), "init plugin base, version", config.BaseVersion) if *flags.DeployMode == flags.DeploySingleton && *flags.EnableKubernetesMeta { instance := k8smeta.GetMetaManagerInstance() err := instance.Init("") if err != nil { logger.Error(context.Background(), "K8S_META_INIT_FAIL", "init k8s meta manager fail", err) return } stopCh := make(chan struct{}) instance.Run(stopCh) } if err := pluginmanager.Init(); err != nil { logger.Error(context.Background(), "PLUGIN_ALARM", "init plugin error", err) rst = 1 } if pluginmanager.AlarmConfig != nil { pluginmanager.AlarmConfig.Start() } if pluginmanager.ContainerConfig != nil { pluginmanager.ContainerConfig.Start() } err := pluginmanager.CheckPointManager.Init() if err != nil { logger.Error(context.Background(), "CHECKPOINT_INIT_ALARM", "init checkpoint manager error", err) } pluginmanager.CheckPointManager.Start() }) return rst } // setGCPercentForSlowStart sets GC percent with a small value at startup // to avoid high RSS (caused by data catch-up) to trigger OOM-kill. func setGCPercentForSlowStart() { gcPercent := 20 _ = util.InitFromEnvInt("ALIYUN_LOGTAIL_GOLANG_GC_PERCENT", &gcPercent, gcPercent) defaultGCPercent := debug.SetGCPercent(gcPercent) logger.Infof(context.Background(), "set startup GC percent from %v to %v", defaultGCPercent, gcPercent) resumeSeconds := 5 * 60 _ = util.InitFromEnvInt("ALIYUN_LOGTAIL_GOLANG_GC_PERCENT_RESUME_SEC", &resumeSeconds, resumeSeconds) go func(pc int, sec int) { time.Sleep(time.Second * time.Duration(sec)) last := debug.SetGCPercent(pc) logger.Infof(context.Background(), "resume GC percent from %v to %v", last, pc) }(defaultGCPercent, resumeSeconds) }