pkg/helper/local_context.go (114 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package helper
import (
"context"
"encoding/json"
"sync"
"github.com/alibaba/ilogtail/pkg"
"github.com/alibaba/ilogtail/pkg/config"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/selfmonitor"
"github.com/alibaba/ilogtail/pkg/util"
)
type LocalContext struct {
MetricsRecords []*selfmonitor.MetricsRecord
logstoreConfigMetricRecord *selfmonitor.MetricsRecord
AllCheckPoint map[string][]byte
ctx context.Context
pluginNames string
common *pkg.LogtailContextMeta
}
var contextMutex sync.RWMutex
func (p *LocalContext) GetConfigName() string {
return p.common.GetConfigName()
}
func (p *LocalContext) GetProject() string {
return p.common.GetProject()
}
func (p *LocalContext) GetLogstore() string {
return p.common.GetLogStore()
}
func (p *LocalContext) GetPipelineScopeConfig() *config.GlobalConfig {
return &config.GlobalConfig{}
}
func (p *LocalContext) AddPlugin(name string) {
if len(p.pluginNames) != 0 {
p.pluginNames += "," + name
} else {
p.pluginNames = name
}
}
func (p *LocalContext) InitContext(project, logstore, configName string) {
// bind metadata information.
p.ctx, p.common = pkg.NewLogtailContextMetaWithoutAlarm(project, logstore, configName)
p.AllCheckPoint = make(map[string][]byte)
}
func (p *LocalContext) GetRuntimeContext() context.Context {
return p.ctx
}
func (p *LocalContext) GetExtension(name string, cfg any) (pipeline.Extension, error) {
return nil, nil
}
func (p *LocalContext) RegisterMetricRecord(labels []selfmonitor.LabelPair) *selfmonitor.MetricsRecord {
contextMutex.Lock()
defer contextMutex.Unlock()
metricsRecord := &selfmonitor.MetricsRecord{Labels: labels}
p.MetricsRecords = append(p.MetricsRecords, metricsRecord)
return metricsRecord
}
func (p *LocalContext) RegisterLogstoreConfigMetricRecord(labels []selfmonitor.LabelPair) *selfmonitor.MetricsRecord {
p.logstoreConfigMetricRecord = &selfmonitor.MetricsRecord{
Labels: labels,
}
return p.logstoreConfigMetricRecord
}
func (p *LocalContext) GetLogstoreConfigMetricRecord() *selfmonitor.MetricsRecord {
return p.logstoreConfigMetricRecord
}
func (p *LocalContext) GetMetricRecord() *selfmonitor.MetricsRecord {
contextMutex.RLock()
if len(p.MetricsRecords) > 0 {
defer contextMutex.RUnlock()
return p.MetricsRecords[len(p.MetricsRecords)-1]
}
contextMutex.RUnlock()
return p.RegisterMetricRecord(nil)
}
// ExportMetricRecords is used for exporting metrics records.
// Each metric is a map[string]string
func (p *LocalContext) ExportMetricRecords() []map[string]string {
contextMutex.RLock()
defer contextMutex.RUnlock()
records := make([]map[string]string, 0)
for _, metricsRecord := range p.MetricsRecords {
records = append(records, metricsRecord.ExportMetricRecords())
}
return records
}
func (p *LocalContext) SaveCheckPoint(key string, value []byte) error {
logger.Debug(p.ctx, "save checkpoint, key", key, "value", string(value))
p.AllCheckPoint[key] = value
return nil
}
func (p *LocalContext) GetCheckPoint(key string) (value []byte, exist bool) {
value, exist = p.AllCheckPoint[key]
logger.Debug(p.ctx, "get checkpoint, key", key, "value", string(value))
return value, exist
}
func (p *LocalContext) SaveCheckPointObject(key string, obj interface{}) error {
val, err := json.Marshal(obj)
if err != nil {
logger.Debug(p.ctx, "CHECKPOINT_INVALID_ALARM", "save checkpoint error, invalid checkpoint, key", key, "val", util.CutString(string(val), 1024), "error", err)
return err
}
return p.SaveCheckPoint(key, val)
}
func (p *LocalContext) GetCheckPointObject(key string, obj interface{}) (exist bool) {
val, ok := p.GetCheckPoint(key)
if !ok {
return false
}
err := json.Unmarshal(val, obj)
if err != nil {
logger.Debug(p.ctx, "CHECKPOINT_INVALID_ALARM", "get checkpoint error, invalid checkpoint, key", key, "val", util.CutString(string(val), 1024), "error", err)
return false
}
return true
}