pluginmanager/context_imp.go (136 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pluginmanager import ( "context" "encoding/json" "fmt" "sync" "github.com/alibaba/ilogtail/pkg" "github.com/alibaba/ilogtail/pkg/config" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/selfmonitor" "github.com/alibaba/ilogtail/pkg/util" ) type ContextImp struct { MetricsRecords []*selfmonitor.MetricsRecord logstoreConfigMetricRecord *selfmonitor.MetricsRecord common *pkg.LogtailContextMeta pluginNames string ctx context.Context logstoreC *LogstoreConfig } var contextMutex sync.RWMutex func (p *ContextImp) GetRuntimeContext() context.Context { return p.ctx } func (p *ContextImp) GetExtension(name string, cfg any) (pipeline.Extension, error) { if p.logstoreC == nil || p.logstoreC.PluginRunner == nil { return nil, fmt.Errorf("pipeline not initialized") } // try to find in extensions that explicitly defined in pipeline exists, ok := p.logstoreC.PluginRunner.GetExtension(name) if ok { return exists, nil } // if it's a naming extension, we won't do further create if isPluginTypeWithID(name) { return nil, fmt.Errorf("not found extension: %s", name) } // create if not found pluginMeta := p.logstoreC.genPluginMeta(name) err := loadExtension(pluginMeta, p.logstoreC, cfg) if err != nil { return nil, err } // get the new created extension exists, ok = p.logstoreC.PluginRunner.GetExtension(pluginMeta.PluginTypeWithID) if !ok { return nil, fmt.Errorf("failed to load extension: %s", pluginMeta.PluginTypeWithID) } return exists, nil } func (p *ContextImp) GetConfigName() string { return p.common.GetConfigName() } func (p *ContextImp) GetProject() string { return p.common.GetProject() } func (p *ContextImp) GetLogstore() string { return p.common.GetLogStore() } func (p *ContextImp) GetPipelineScopeConfig() *config.GlobalConfig { return p.logstoreC.GlobalConfig } func (p *ContextImp) AddPlugin(name string) { if len(p.pluginNames) != 0 { p.pluginNames += "," + name } else { p.pluginNames = name } } func (p *ContextImp) InitContext(project, logstore, configName string) { // bind metadata information. p.ctx, p.common = pkg.NewLogtailContextMeta(project, logstore, configName) } func (p *ContextImp) RegisterMetricRecord(labels []selfmonitor.LabelPair) *selfmonitor.MetricsRecord { contextMutex.Lock() defer contextMutex.Unlock() metricsRecord := &selfmonitor.MetricsRecord{Labels: labels} p.MetricsRecords = append(p.MetricsRecords, metricsRecord) return metricsRecord } func (p *ContextImp) RegisterLogstoreConfigMetricRecord(labels []selfmonitor.LabelPair) *selfmonitor.MetricsRecord { p.logstoreConfigMetricRecord = &selfmonitor.MetricsRecord{ Labels: labels, } return p.logstoreConfigMetricRecord } func (p *ContextImp) GetLogstoreConfigMetricRecord() *selfmonitor.MetricsRecord { return p.logstoreConfigMetricRecord } func (p *ContextImp) GetMetricRecord() *selfmonitor.MetricsRecord { contextMutex.RLock() if len(p.MetricsRecords) > 0 { defer contextMutex.RUnlock() return p.MetricsRecords[len(p.MetricsRecords)-1] } contextMutex.RUnlock() return p.RegisterMetricRecord(nil) } // ExportMetricRecords is used for exporting metrics records. // Each metric is a map[string]string func (p *ContextImp) ExportMetricRecords() []map[string]string { contextMutex.RLock() defer contextMutex.RUnlock() records := make([]map[string]string, 0) for _, metricsRecord := range p.MetricsRecords { records = append(records, metricsRecord.ExportMetricRecords()) } return records } func (p *ContextImp) SaveCheckPoint(key string, value []byte) error { logger.Debug(p.ctx, "save checkpoint, key", key, "value", string(value)) return CheckPointManager.SaveCheckpoint(p.GetConfigName(), key, value) } func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool) { configName := p.GetConfigName() l := len(configName) if l > 2 && configName[l-2:] == "/1" { configName = configName[:l-2] } value, err := CheckPointManager.GetCheckpoint(configName, key) logger.Debug(p.ctx, "get checkpoint, key", key, "value", string(value), "error", err) return value, value != nil } func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error { val, err := json.Marshal(obj) if err != nil { return err } return p.SaveCheckPoint(key, val) } func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool) { val, ok := p.GetCheckPoint(key) if !ok { return false } err := json.Unmarshal(val, obj) if err != nil { logger.Error(p.ctx, "CHECKPOINT_INVALID_ALARM", "invalid checkpoint, key", key, "val", util.CutString(string(val), 1024), "error", err) return false } return true }