plugins/flusher/stdout/flusher_stdout.go (257 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package stdout import ( "encoding/json" "fmt" "strconv" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/cihub/seelog" jsoniter "github.com/json-iterator/go" ) const flushMsg = ` <seelog minlevel="info" > <outputs formatid="common"> %s </outputs> <formats> <format id="common" format="%%Date %%Time %%Msg%%n" /> </formats> </seelog> ` // FlusherStdout is a flusher plugin in plugin system. // It has two usages: // 1. flusher the logGroup to the stdout // 2. flusher the logGroup to the file. If the specified file name is not configured, // the logGroups would append to the global log file. type FlusherStdout struct { FileName string MaxSize int MaxRolls int KeyValuePairs bool Tags bool OnlyStdout bool context pipeline.Context outLogger seelog.LoggerInterface } // Init method would be trigger before working. For the plugin, init method choose the log output // channel. func (p *FlusherStdout) Init(context pipeline.Context) error { p.context = context pattern := "" if p.OnlyStdout { pattern = "<console/>" logger.CloseCatchStdout() } else if p.FileName != "" { pattern = `<rollingfile type="size" filename="%s" maxsize="%d" maxrolls="%d"/>` if p.MaxSize <= 0 { p.MaxSize = 1024 * 1024 } if p.MaxRolls <= 0 { p.MaxRolls = 1 } pattern = fmt.Sprintf(pattern, p.FileName, p.MaxSize, p.MaxRolls) } if pattern != "" { var err error p.outLogger, err = seelog.LoggerFromConfigAsString(fmt.Sprintf(flushMsg, pattern)) if err != nil { logger.Error(p.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init stdout flusher fail, error", err) p.outLogger = seelog.Disabled } } return nil } func (*FlusherStdout) Description() string { return "stdout flusher for logtail" } // Flush the logGroup list to stdout or files. func (p *FlusherStdout) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error { for _, logGroup := range logGroupList { if p.Tags { if p.outLogger != nil { p.outLogger.Infof("[LogGroup] topic %s, logstore %s, logcount %d, tags %v", logGroup.Topic, logGroup.Category, len(logGroup.Logs), logGroup.LogTags) } else { logger.Info(p.context.GetRuntimeContext(), "[LogGroup] topic", logGroup.Topic, "logstore", logGroup.Category, "logcount", len(logGroup.Logs), "tags", logGroup.LogTags) } } if p.KeyValuePairs { for _, log := range logGroup.Logs { writer := jsoniter.NewStream(jsoniter.ConfigDefault, nil, 128) writer.WriteObjectStart() for _, c := range log.Contents { writer.WriteObjectField(c.Key) writer.WriteString(c.Value) _, _ = writer.Write([]byte{','}) } writer.WriteObjectField("__time__") writer.WriteString(strconv.Itoa(int(log.Time))) writer.WriteObjectEnd() if p.outLogger != nil { p.outLogger.Infof("%s", writer.Buffer()) } else { logger.Info(p.context.GetRuntimeContext(), string(writer.Buffer())) } } } else { for _, log := range logGroup.Logs { buf, _ := json.Marshal(log) if p.outLogger != nil { p.outLogger.Infof("%s", buf) } else { logger.Info(p.context.GetRuntimeContext(), string(buf)) } } } } return nil } func (p *FlusherStdout) Export(in []*models.PipelineGroupEvents, context pipeline.PipelineContext) error { for _, groupEvents := range in { if p.Tags { metadata := fmt.Sprintf("%v", groupEvents.Group.GetMetadata().Iterator()) tags := fmt.Sprintf("%v", groupEvents.Group.GetTags().Iterator()) if p.outLogger != nil { p.outLogger.Infof("[Event] event %d, metadata %s, tags %s", len(groupEvents.Events), metadata, tags) } else { logger.Info(p.context.GetRuntimeContext(), "[Event] event", len(groupEvents.Events), "metadata", metadata, "tags", tags) } } for _, event := range groupEvents.Events { writer := jsoniter.NewStream(jsoniter.ConfigDefault, nil, 128) writer.WriteObjectStart() writer.WriteObjectField("eventType") switch event.GetType() { case models.EventTypeMetric: writer.WriteString("metric") case models.EventTypeSpan: writer.WriteString("span") case models.EventTypeLogging: writer.WriteString("log") case models.EventTypeByteArray: writer.WriteString("byteArray") } _, _ = writer.Write([]byte{','}) writer.WriteObjectField("name") writer.WriteString(event.GetName()) _, _ = writer.Write([]byte{','}) writer.WriteObjectField("timestamp") writer.WriteUint64(event.GetTimestamp()) _, _ = writer.Write([]byte{','}) writer.WriteObjectField("observedTimestamp") writer.WriteUint64(event.GetObservedTimestamp()) _, _ = writer.Write([]byte{','}) writer.WriteObjectField("tags") writer.WriteObjectStart() i := 0 for k, v := range event.GetTags().Iterator() { writer.WriteObjectField(k) writer.WriteString(v) if i < event.GetTags().Len()-1 { _, _ = writer.Write([]byte{','}) } i++ } writer.WriteObjectEnd() _, _ = writer.Write([]byte{','}) switch event.GetType() { case models.EventTypeMetric: p.writeMetricValues(writer, event.(*models.Metric)) case models.EventTypeSpan: p.writeSpan(writer, nil) case models.EventTypeLogging: p.writeLogBody(writer, event.(*models.Log)) case models.EventTypeByteArray: p.writeByteArray(writer, event.(models.ByteArray)) } writer.WriteObjectEnd() if p.outLogger != nil { p.outLogger.Infof("%s", writer.Buffer()) } else { logger.Info(p.context.GetRuntimeContext(), string(writer.Buffer())) } } } return nil } func (p *FlusherStdout) writeMetricValues(writer *jsoniter.Stream, metric *models.Metric) { writer.WriteObjectField("metricType") writer.WriteString(models.MetricTypeTexts[metric.GetMetricType()]) _, _ = writer.Write([]byte{','}) if metric.GetValue().IsSingleValue() { writer.WriteObjectField("value") writer.WriteFloat64(metric.GetValue().GetSingleValue()) } else { writer.WriteObjectField("values") writer.WriteObjectStart() values := metric.GetValue().GetMultiValues() i := 0 for k, v := range values.Iterator() { writer.WriteObjectField(k) writer.WriteFloat64(v) if i < values.Len()-1 { _, _ = writer.Write([]byte{','}) } i++ } if metric.GetTypedValue().Len() > 0 { _, _ = writer.Write([]byte{','}) i = 0 for k, v := range metric.GetTypedValue().Iterator() { writer.WriteObjectField(k) switch v.Type { case models.ValueTypeString: writer.WriteString(v.Value.(string)) case models.ValueTypeBoolean: writer.WriteBool(v.Value.(bool)) } if i < metric.GetTypedValue().Len()-1 { _, _ = writer.Write([]byte{','}) } i++ } } writer.WriteObjectEnd() } } func (p *FlusherStdout) writeSpan(writer *jsoniter.Stream, metric *models.Span) { // TODO } func (p *FlusherStdout) writeLogBody(writer *jsoniter.Stream, log *models.Log) { writer.WriteObjectField("offset") writer.WriteInt64(int64(log.GetOffset())) _, _ = writer.Write([]byte{','}) writer.WriteObjectField("level") writer.WriteString(log.GetLevel()) _, _ = writer.Write([]byte{','}) writer.WriteObjectField("traceID") writer.WriteString(log.GetTraceID()) _, _ = writer.Write([]byte{','}) writer.WriteObjectField("traceID") writer.WriteString(log.GetTraceID()) _, _ = writer.Write([]byte{','}) writer.WriteObjectField("spanID") writer.WriteString(log.GetSpanID()) contents := log.GetIndices() for key, value := range contents.Iterator() { _, _ = writer.Write([]byte{','}) writer.WriteObjectField(key) _, _ = writer.Write([]byte(fmt.Sprintf("%#v", value))) } } func (p FlusherStdout) writeByteArray(writer *jsoniter.Stream, bytes models.ByteArray) { writer.WriteObjectField("byteArray") _, _ = writer.Write([]byte{'"'}) _, _ = writer.Write(bytes) _, _ = writer.Write([]byte{'"'}) } func (p *FlusherStdout) SetUrgent(flag bool) { } // IsReady is ready to flush func (*FlusherStdout) IsReady(projectName string, logstoreName string, logstoreKey int64) bool { return true } // Stop ... func (p *FlusherStdout) Stop() error { if p.outLogger != nil { p.outLogger.Close() } return nil } // Register the plugin to the Flushers array. func init() { pipeline.Flushers["flusher_stdout"] = func() pipeline.Flusher { return &FlusherStdout{ KeyValuePairs: true, } } }