func()

in plugins/processor/logtoslsmetric/processor_log_to_sls_metric.go [171:335]


func (p *ProcessorLogToSlsMetric) ProcessLogs(logArray []*protocol.Log) []*protocol.Log {
	var metricLogs []*protocol.Log
TraverseLogArray:
	for _, log := range logArray {
		names := map[string]string{}
		values := map[string]string{}
		// __time_nano__ field
		var timeNano string
		// __labels__ field
		metricLabels := converter.MetricLabels{}
		labelExisted := map[string]bool{}
		for i, cont := range log.Contents {
			if log.Contents[i] == nil {
				continue
			}

			// __labels__
			if cont.Key == metricLabelsKey {
				labels := strings.Split(cont.Value, converter.LabelSeparator)
				for _, label := range labels {
					keyValues := strings.Split(label, converter.KeyValueSeparator)
					if len(keyValues) != 2 {
						p.logError(errInvalidMetricLabelValue)
						continue TraverseLogArray
					}
					key := keyValues[0]
					if p.metricLabelKeysMap[key] {
						p.logError(errFieldRepeated)
						continue TraverseLogArray
					}
					// The Key of Label must follow the regular expression: ^[a-zA-Z_][a-zA-Z0-9_]*$
					if !metricLabelKeyRegex.MatchString(key) {
						p.logError(errInvalidMetricLabelKey)
						continue TraverseLogArray
					}
					value := keyValues[1]
					// The value of Label cannot contain "|" or "#$#".
					if strings.Contains(value, converter.LabelSeparator) || strings.Contains(value, converter.KeyValueSeparator) {
						p.logError(errInvalidMetricLabelValue)
						continue TraverseLogArray
					}
					metricLabels = append(metricLabels, converter.MetricLabel{Key: key, Value: value})
				}
				continue
			}

			// Match to the label field
			if p.metricLabelKeysMap[cont.Key] {
				if labelExisted[cont.Key] {
					p.logError(errFieldRepeated)
					continue TraverseLogArray
				}
				// The value of Label cannot contain "|" or "#$#".
				if strings.Contains(cont.Value, converter.LabelSeparator) || strings.Contains(cont.Value, converter.KeyValueSeparator) {
					p.logError(errInvalidMetricLabelValue)
					continue TraverseLogArray
				}
				labelExisted[cont.Key] = true
				metricLabels = append(metricLabels, converter.MetricLabel{Key: cont.Key, Value: cont.Value})
				continue
			}

			// Match to the name field
			if p.metricNamesMap[cont.Key] {
				// Metric name needs to follow the regular expression: ^[a-zA-Z_:][a-zA-Z0-9_:]*$
				if !metricNameRegex.MatchString(cont.Value) {
					p.logError(errInvalidMetricName)
					continue TraverseLogArray
				}
				names[cont.Key] = cont.Value
				continue
			}

			// Match to the value field
			if p.metricValuesMap[cont.Key] {
				// Metric value needs to be a float type string.
				if !canParseToFloat64(cont.Value) {
					p.logError(errInvalidMetricValue)
					continue TraverseLogArray
				}
				values[cont.Key] = cont.Value
				continue
			}

			if p.MetricTimeKey != "" && cont.Key == p.MetricTimeKey {
				if !isTimeNano(cont.Value) {
					p.logError(errInvalidMetricTime)
					continue TraverseLogArray
				}
				switch len(cont.Value) {
				case 19:
					// nanosecond
					timeNano = cont.Value
				case 16:
					// microsecond
					timeNano = cont.Value + "000"
				case 13:
					// millisecond
					timeNano = cont.Value + "000000"
				case 10:
					// second
					timeNano = cont.Value + "000000000"
				}
				continue
			}
		}

		if timeNano == "" {
			if p.MetricTimeKey != "" {
				p.logError(errInvalidMetricTime)
				continue TraverseLogArray
			}
			timeNano = GetLogTimeNano(log)
		}

		// The number of labels must be equal to the number of label fields.
		if len(labelExisted) != len(p.MetricLabelKeys) {
			p.logError(errInvalidMetricLabelKeyCount)
			continue TraverseLogArray
		}

		// The number of names must be equal to the number of name fields.
		if len(names) != len(p.metricNamesMap) {
			p.logError(errInvalidMetricNameCount)
			continue TraverseLogArray
		}

		// The number of values must be equal to the number of value fields.
		if len(values) != len(p.metricValuesMap) {
			p.logError(errInvalidMetricValueCount)
			continue TraverseLogArray
		}

		for key, value := range p.CustomMetricLabels {
			metricLabels = append(metricLabels, converter.MetricLabel{Key: key, Value: value})
		}

		metricLabel := metricLabels.GetLabel()

		for name, value := range p.MetricValues {
			metricLog := &protocol.Log{
				Time:     log.Time,
				Contents: nil,
			}
			metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
				Key:   metricLabelsKey,
				Value: metricLabel,
			})
			metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
				Key:   metricNameKey,
				Value: names[name],
			})
			metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
				Key:   metricValueKey,
				Value: values[value],
			})
			metricLog.Contents = append(metricLog.Contents, &protocol.Log_Content{
				Key:   metricTimeNanoKey,
				Value: timeNano,
			})
			metricLogs = append(metricLogs, metricLog)
		}
	}
	return metricLogs
}