in plugins/processor/logtoslsmetric/processor_log_to_sls_metric.go [171:335]
func (p *ProcessorLogToSlsMetric) ProcessLogs(logArray []*protocol.Log) []*protocol.Log {
var metricLogs []*protocol.Log
TraverseLogArray:
for _, log := range logArray {
names := map[string]string{}
values := map[string]string{}
// __time_nano__ field
var timeNano string
// __labels__ field
metricLabels := converter.MetricLabels{}
labelExisted := map[string]bool{}
for i, cont := range log.Contents {
if log.Contents[i] == nil {
continue
}
// __labels__
if cont.Key == metricLabelsKey {
labels := strings.Split(cont.Value, converter.LabelSeparator)
for _, label := range labels {
keyValues := strings.Split(label, converter.KeyValueSeparator)
if len(keyValues) != 2 {
p.logError(errInvalidMetricLabelValue)
continue TraverseLogArray
}
key := keyValues[0]
if p.metricLabelKeysMap[key] {
p.logError(errFieldRepeated)
continue TraverseLogArray
}
// The Key of Label must follow the regular expression: ^[a-zA-Z_][a-zA-Z0-9_]*$
if !metricLabelKeyRegex.MatchString(key) {
p.logError(errInvalidMetricLabelKey)
continue TraverseLogArray
}
value := keyValues[1]
// The value of Label cannot contain "|" or "#$#".
if strings.Contains(value, converter.LabelSeparator) || strings.Contains(value, converter.KeyValueSeparator) {
p.logError(errInvalidMetricLabelValue)
continue TraverseLogArray
}
metricLabels = append(metricLabels, converter.MetricLabel{Key: key, Value: value})
}
continue
}
// Match to the label field
if p.metricLabelKeysMap[cont.Key] {
if labelExisted[cont.Key] {
p.logError(errFieldRepeated)
continue TraverseLogArray
}
// The value of Label cannot contain "|" or "#$#".
if strings.Contains(cont.Value, converter.LabelSeparator) || strings.Contains(cont.Value, converter.KeyValueSeparator) {
p.logError(errInvalidMetricLabelValue)
continue TraverseLogArray
}
labelExisted[cont.Key] = true
metricLabels = append(metricLabels, converter.MetricLabel{Key: cont.Key, Value: cont.Value})
continue
}
// Match to the name field
if p.metricNamesMap[cont.Key] {
// Metric name needs to follow the regular expression: ^[a-zA-Z_:][a-zA-Z0-9_:]*$
if !metricNameRegex.MatchString(cont.Value) {
p.logError(errInvalidMetricName)
continue TraverseLogArray
}
names[cont.Key] = cont.Value
continue
}
// Match to the value field
if p.metricValuesMap[cont.Key] {
// Metric value needs to be a float type string.
if !canParseToFloat64(cont.Value) {
p.logError(errInvalidMetricValue)
continue TraverseLogArray
}
values[cont.Key] = cont.Value
continue
}
if p.MetricTimeKey != "" && cont.Key == p.MetricTimeKey {
if !isTimeNano(cont.Value) {
p.logError(errInvalidMetricTime)
continue TraverseLogArray
}
switch len(cont.Value) {
case 19:
// nanosecond
timeNano = cont.Value
case 16:
// microsecond
timeNano = cont.Value + "000"
case 13:
// millisecond
timeNano = cont.Value + "000000"
case 10:
// second
timeNano = cont.Value + "000000000"
}
continue
}
}
if timeNano == "" {
if p.MetricTimeKey != "" {
p.logError(errInvalidMetricTime)
continue TraverseLogArray
}
timeNano = GetLogTimeNano(log)
}
// The number of labels must be equal to the number of label fields.
if len(labelExisted) != len(p.MetricLabelKeys) {
p.logError(errInvalidMetricLabelKeyCount)
continue TraverseLogArray
}
// The number of names must be equal to the number of name fields.
if len(names) != len(p.metricNamesMap) {
p.logError(errInvalidMetricNameCount)
continue TraverseLogArray
}
// The number of values must be equal to the number of value fields.
if len(values) != len(p.metricValuesMap) {
p.logError(errInvalidMetricValueCount)
continue TraverseLogArray
}
for key, value := range p.CustomMetricLabels {
metricLabels = append(metricLabels, converter.MetricLabel{Key: key, Value: value})
}
metricLabel := metricLabels.GetLabel()
for name, value := range p.MetricValues {
metricLog := &protocol.Log{
Time: log.Time,
Contents: nil,
}
metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
Key: metricLabelsKey,
Value: metricLabel,
})
metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
Key: metricNameKey,
Value: names[name],
})
metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
Key: metricValueKey,
Value: values[value],
})
metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
Key: metricTimeNanoKey,
Value: timeNano,
})
metricLogs = append(metricLogs, metricLog)
}
}
return metricLogs
}