pkg/protocol/converter/converter.go (269 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package protocol
import (
"fmt"
"strings"
"sync"
"github.com/alibaba/ilogtail/pkg/config"
"github.com/alibaba/ilogtail/pkg/flags"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/protocol"
)
const (
ProtocolCustomSingle = "custom_single"
ProtocolCustomSingleFlatten = "custom_single_flatten"
ProtocolOtlpV1 = "otlp_v1"
ProtocolInfluxdb = "influxdb"
ProtocolJsonline = "jsonline"
ProtocolRaw = "raw"
)
const (
EncodingNone = "none"
EncodingJSON = "json"
EncodingProtobuf = "protobuf"
EncodingCustom = "custom"
)
const (
tagPrefix = "__tag__:"
targetContentPrefix = "content."
targetTagPrefix = "tag."
targetGroupMetadataPrefix = "metadata."
)
const (
tagHostIP = "host.ip"
tagLogTopic = "log.topic"
tagLogFilePath = "log.file.path"
tagHostname = "host.name"
tagK8sNodeIP = "k8s.node.ip"
tagK8sNodeName = "k8s.node.name"
tagK8sNamespace = "k8s.namespace.name"
tagK8sPodName = "k8s.pod.name"
tagK8sPodIP = "k8s.pod.ip"
tagK8sPodUID = "k8s.pod.uid"
tagContainerName = "container.name"
tagContainerIP = "container.ip"
tagContainerImageName = "container.image.name"
tagK8sContainerName = "k8s.container.name"
tagK8sContainerIP = "k8s.container.ip"
tagK8sContainerImageName = "k8s.container.image.name"
)
// todo: make multiple pools for different size levels
var byteBufPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, 1024)
return &buf
},
}
var tagConversionMap = map[string]string{
"__path__": tagLogFilePath,
"__hostname__": tagHostname,
"_node_ip_": tagK8sNodeIP,
"_node_name_": tagK8sNodeName,
"_namespace_": tagK8sNamespace,
"_pod_name_": tagK8sPodName,
"_pod_ip_": tagK8sPodIP,
"_pod_uid_": tagK8sPodUID,
"_container_name_": tagContainerName,
"_container_ip_": tagContainerIP,
"_image_name_": tagContainerImageName,
}
// When in k8s, the following tags should be renamed to k8s-specific names.
var specialTagConversionMap = map[string]string{
"_container_name_": tagK8sContainerName,
"_container_ip_": tagK8sContainerIP,
"_image_name_": tagK8sContainerImageName,
}
var supportedEncodingMap = map[string]map[string]bool{
ProtocolCustomSingle: {
EncodingJSON: true,
EncodingProtobuf: false,
},
ProtocolCustomSingleFlatten: {
EncodingJSON: true,
EncodingProtobuf: false,
},
ProtocolOtlpV1: {
EncodingNone: true,
},
ProtocolInfluxdb: {
EncodingCustom: true,
},
ProtocolJsonline: {
EncodingJSON: true,
},
ProtocolRaw: {
EncodingCustom: true,
},
}
type Converter struct {
Protocol string
Encoding string
Separator string
IgnoreUnExpectedData bool
OnlyContents bool
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
GlobalConfig *config.GlobalConfig
}
func NewConverterWithSep(protocol, encoding, sep string, ignoreUnExpectedData bool, tagKeyRenameMap, protocolKeyRenameMap map[string]string, globalConfig *config.GlobalConfig) (*Converter, error) {
converter, err := NewConverter(protocol, encoding, tagKeyRenameMap, protocolKeyRenameMap, globalConfig)
if err != nil {
return nil, err
}
converter.Separator = sep
converter.IgnoreUnExpectedData = ignoreUnExpectedData
return converter, nil
}
func NewConverter(protocol, encoding string, tagKeyRenameMap, protocolKeyRenameMap map[string]string, globalConfig *config.GlobalConfig) (*Converter, error) {
enc, ok := supportedEncodingMap[protocol]
if !ok {
return nil, fmt.Errorf("unsupported protocol: %s", protocol)
}
if supported, existed := enc[encoding]; !existed || !supported {
return nil, fmt.Errorf("unsupported encoding: %s for protocol %s", encoding, protocol)
}
return &Converter{
Protocol: protocol,
Encoding: encoding,
TagKeyRenameMap: tagKeyRenameMap,
ProtocolKeyRenameMap: protocolKeyRenameMap,
GlobalConfig: globalConfig,
}, nil
}
func (c *Converter) Do(logGroup *protocol.LogGroup) (logs interface{}, err error) {
logs, _, err = c.DoWithSelectedFields(logGroup, nil)
return
}
func (c *Converter) DoWithSelectedFields(logGroup *protocol.LogGroup, targetFields []string) (logs interface{}, values []map[string]string, err error) {
switch c.Protocol {
case ProtocolCustomSingle:
return c.ConvertToSingleProtocolLogs(logGroup, targetFields)
case ProtocolCustomSingleFlatten:
return c.ConvertToSingleProtocolLogsFlatten(logGroup, targetFields)
case ProtocolOtlpV1:
return c.ConvertToOtlpResourseLogs(logGroup, targetFields)
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", c.Protocol)
}
}
func (c *Converter) ToByteStream(logGroup *protocol.LogGroup) (stream interface{}, err error) {
stream, _, err = c.ToByteStreamWithSelectedFields(logGroup, nil)
return
}
func (c *Converter) ToByteStreamWithSelectedFields(logGroup *protocol.LogGroup, targetFields []string) (stream interface{}, values []map[string]string, err error) {
switch c.Protocol {
case ProtocolCustomSingle:
return c.ConvertToSingleProtocolStream(logGroup, targetFields)
case ProtocolCustomSingleFlatten:
return c.ConvertToSingleProtocolStreamFlatten(logGroup, targetFields)
case ProtocolInfluxdb:
return c.ConvertToInfluxdbProtocolStream(logGroup, targetFields)
case ProtocolJsonline:
return c.ConvertToJsonlineProtocolStreamFlatten(logGroup)
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", c.Protocol)
}
}
func (c *Converter) ToByteStreamWithSelectedFieldsV2(groupEvents *models.PipelineGroupEvents, targetFields []string) (stream interface{}, values []map[string]string, err error) {
switch c.Protocol {
case ProtocolRaw:
return c.ConvertToRawStream(groupEvents, targetFields)
case ProtocolInfluxdb:
return c.ConvertToInfluxdbProtocolStreamV2(groupEvents, targetFields)
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", c.Protocol)
}
}
func GetPooledByteBuf() *[]byte {
return byteBufPool.Get().(*[]byte)
}
func PutPooledByteBuf(buf *[]byte) {
*buf = (*buf)[:0]
byteBufPool.Put(buf)
}
func TrimPrefix(str string) string {
switch {
case strings.HasPrefix(str, targetContentPrefix):
return strings.TrimPrefix(str, targetContentPrefix)
case strings.HasPrefix(str, targetTagPrefix):
return strings.TrimPrefix(str, targetTagPrefix)
default:
return str
}
}
func convertLogToMap(log *protocol.Log, logTags []*protocol.LogTag, src, topic string, tagKeyRenameMap map[string]string) (map[string]string, map[string]string) {
contents, tags := make(map[string]string), make(map[string]string)
for _, logContent := range log.Contents {
switch logContent.Key {
case "__log_topic__":
addTagIfRequired(tags, tagKeyRenameMap, tagLogTopic, logContent.Value)
case tagPrefix + "__user_defined_id__":
continue
default:
var tagName string
if strings.HasPrefix(logContent.Key, tagPrefix) {
tagName = logContent.Key[len(tagPrefix):]
if _, ok := specialTagConversionMap[tagName]; *flags.K8sFlag && ok {
tagName = specialTagConversionMap[tagName]
} else if _, ok := tagConversionMap[tagName]; ok {
tagName = tagConversionMap[tagName]
}
} else {
if _, ok := specialTagConversionMap[logContent.Key]; *flags.K8sFlag && ok {
tagName = specialTagConversionMap[logContent.Key]
} else if _, ok := tagConversionMap[logContent.Key]; ok {
tagName = tagConversionMap[logContent.Key]
}
}
if len(tagName) != 0 {
addTagIfRequired(tags, tagKeyRenameMap, tagName, logContent.Value)
} else {
contents[logContent.Key] = logContent.Value
}
}
}
for _, logTag := range logTags {
if logTag.Key == "__user_defined_id__" || logTag.Key == "__pack_id__" {
continue
}
tagName := logTag.Key
if _, ok := specialTagConversionMap[logTag.Key]; *flags.K8sFlag && ok {
tagName = specialTagConversionMap[logTag.Key]
} else if _, ok := tagConversionMap[logTag.Key]; ok {
tagName = tagConversionMap[logTag.Key]
}
addTagIfRequired(tags, tagKeyRenameMap, tagName, logTag.Value)
}
addTagIfRequired(tags, tagKeyRenameMap, tagHostIP, src)
if topic != "" {
addTagIfRequired(tags, tagKeyRenameMap, tagLogTopic, topic)
}
return contents, tags
}
func findTargetValues(targetFields []string, contents, tags, tagKeyRenameMap map[string]string) (map[string]string, error) {
if len(targetFields) == 0 {
return nil, nil
}
desiredValue := make(map[string]string, len(targetFields))
for _, field := range targetFields {
switch {
case strings.HasPrefix(field, targetContentPrefix):
if value, ok := contents[field[len(targetContentPrefix):]]; ok {
desiredValue[field] = value
}
case strings.HasPrefix(field, targetTagPrefix):
if value, ok := tags[field[len(targetTagPrefix):]]; ok {
desiredValue[field] = value
} else if value, ok := tagKeyRenameMap[field[len(targetTagPrefix):]]; ok {
desiredValue[field] = tags[value]
}
default:
return nil, fmt.Errorf("unsupported field: %s", field)
}
}
return desiredValue, nil
}
func addTagIfRequired(tags, tagKeyRenameMap map[string]string, key, value string) {
if newKey, ok := tagKeyRenameMap[key]; ok && len(newKey) != 0 {
tags[newKey] = value
} else if !ok {
tags[key] = value
}
}