plugins/flusher/sls/flusher_sls.go (69 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build linux || windows // +build linux windows package sls import ( "fmt" "github.com/alibaba/ilogtail/pkg/logtail" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/util" ) // SlsFlusher uses symbols in LogtailAdaptor(.so) to flush log groups to Logtail. type SlsFlusher struct { // nolint:revive EnableShardHash bool KeepShardHash bool context pipeline.Context } // Init ... func (p *SlsFlusher) Init(context pipeline.Context) error { p.context = context return nil } // Description ... func (p *SlsFlusher) Description() string { return "logtail flusher for log service" } // IsReady ... // There is a sending queue in Logtail, call LogtailIsValidToSend through cgo // to make sure if there is any space for coming data. func (p *SlsFlusher) IsReady(projectName string, logstoreName string, logstoreKey int64) bool { return logtail.IsValidToSend(logstoreKey) } // Flush ... // Because IsReady is called before, Logtail must have space in sending queue, // just call LogtailSendPb through cgo to push data into queue, Logtail will // send data to its destination (SLS mostly) according to its config. func (p *SlsFlusher) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error { for _, logGroup := range logGroupList { if len(logGroup.Logs) == 0 { continue } var shardHash string if p.EnableShardHash { for idx, tag := range logGroup.LogTags { if tag.Key == util.ShardHashTagKey { shardHash = tag.Value if !p.KeepShardHash { logGroup.LogTags = append(logGroup.LogTags[0:idx], logGroup.LogTags[idx+1:]...) } break } } } buf, err := logGroup.Marshal() if err != nil { return fmt.Errorf("loggroup marshal err %v", err) } var rst int if !p.EnableShardHash { rst = logtail.SendPb(configName, logGroup.Category, buf, len(logGroup.Logs)) } else { rst = logtail.SendPbV2(configName, logGroup.Category, buf, len(logGroup.Logs), shardHash) } if rst < 0 { return fmt.Errorf("send error %d", rst) } } return nil } // SetUrgent ... // We do nothing here because necessary flag has already been set in Logtail // before this method is called. Any future call of IsReady will return // true so that remaining data can be flushed to Logtail (which will flush // data to local file system) before it quits. func (*SlsFlusher) SetUrgent(flag bool) { } // Stop ... // We do nothing here because SlsFlusher does not create any resources. func (*SlsFlusher) Stop() error { return nil } func init() { pipeline.Flushers["flusher_sls"] = func() pipeline.Flusher { return &SlsFlusher{ EnableShardHash: false, KeepShardHash: true, } } }