func()

in components/google-built-opentelemetry-collector/exporter/googleservicecontrolexporter/logs.go [254:385]


func (l logMapper) parseLogEntry(logRecord plog.LogRecord, processTime time.Time) (*scpb.LogEntry, error) {
	ts := logRecord.Timestamp().AsTime()
	if logRecord.Timestamp() == 0 || ts.IsZero() {
		// if timestamp is unset, fall back to observed_time_unix_nano as recommended
		//   (see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179)
		if logRecord.ObservedTimestamp() != 0 {
			ts = logRecord.ObservedTimestamp().AsTime()
		} else {
			// if observed_time is 0, use the process time, which is the current time
			ts = processTime
		}
	}

	logName, err := l.getLogName(logRecord)
	if err != nil {
		return nil, err
	}

	entry := &scpb.LogEntry{
		Name:      logName,
		Timestamp: timestamppb.New(ts),
		Labels:    map[string]string{},
	}

	// build our own map off OTel attributes so we don't have to call .Get() for each special case
	// (.Get() ranges over all attributes each time)
	attrsMap := make(map[string]pcommon.Value)
	logRecord.Attributes().Range(func(k string, v pcommon.Value) bool {
		attrsMap[k] = v
		return true
	})

	// Parse LogEntry InsertId struct from OTel attribute
	// TODO(lujieduan): we should evaluate if we need to parse insertId from
	// the logs: the FluentBit plugin parses this but Otel Cloud Logging
	// exporter does not and let API assigns the InsertId
	if insertIdAttr, ok := attrsMap[InsertIdAttributeKey]; ok {
		entry.InsertId = insertIdAttr.AsString()
		delete(attrsMap, InsertIdAttributeKey)
	}
	// When insertId is not present in the attributes, FluentBit would generate
	// UUIDs in the exporter; here we would just leave it blank and let server
	// assigns new UUIDs - same end results, easy for testing

	// parse LogEntrySourceLocation struct from OTel attribute
	if sourceLocation, ok := attrsMap[SourceLocationAttributeKey]; ok {
		var logEntrySourceLocation scpb.LogEntrySourceLocation
		err := unmarshalAttribute(sourceLocation, &logEntrySourceLocation)
		if err != nil {
			return nil, &attributeProcessingError{Key: SourceLocationAttributeKey, Err: err}
		}
		entry.SourceLocation = &logEntrySourceLocation
		delete(attrsMap, SourceLocationAttributeKey)
	}

	// parse HttpRequest
	if httpRequestAttr, ok := attrsMap[HTTPRequestAttributeKey]; ok {
		httpRequest, err := l.parseHTTPRequest(httpRequestAttr)
		if err != nil {
			l.logger.Warn("Unable to parse httpRequest", zap.Error(err))
		}
		entry.HttpRequest = httpRequest
		delete(attrsMap, HTTPRequestAttributeKey)
	}

	// parse Severity
	severityNumber := logRecord.SeverityNumber()
	// Log severity levels are based on numerical values defined by Otel/GCP, which are informally mapped to generic text values such as "ALERT", "Debug", etc.
	// In some cases, a SeverityText value can be automatically mapped to a matching SeverityNumber.
	// If not (for example, when directly setting the SeverityText on a Log entry with the Transform processor), then the
	// SeverityText might be something like "ALERT" while the SeverityNumber is still "0".
	// In this case, we will attempt to map the text ourselves to one of the defined Otel SeverityNumbers.
	// We do this by checking that the SeverityText is NOT "default" (ie, it exists in our map) and that the SeverityNumber IS "0".
	// (This also excludes other unknown/custom severity text values, which may have user-defined mappings in the collector)
	if severityForText, ok := otelSeverityForText[strings.ToLower(logRecord.SeverityText())]; ok && severityNumber == 0 {
		severityNumber = severityForText
	}
	entry.Severity, err = severityMapping(severityNumber)
	if err != nil {
		l.logger.Warn(fmt.Errorf("error parsing severity %v with error: %s", logRecord.SeverityNumber(), err))
	}

	// parse remaining OTel attributes to GCP labels
	for k, v := range attrsMap {
		if k == LogNameAttributeKey {
			continue
		}
		if _, ok := entry.Labels[k]; !ok {
			entry.Labels[k] = v.AsString()
		}
	}

	// Handle map and bytes as JSON-structured logs if they are successfully converted.
	switch logRecord.Body().Type() {
	case pcommon.ValueTypeMap:
		s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw())
		if err == nil {
			entry.Payload = &scpb.LogEntry_StructPayload{StructPayload: s}
			return entry, nil
		}
		l.logger.Debug(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err))
	case pcommon.ValueTypeBytes:
		s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
		if err == nil {
			entry.Payload = &scpb.LogEntry_StructPayload{StructPayload: s}
			return entry, nil
		}
		l.logger.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err))
	}

	// Fields: LogEntry.trace, LogEntry.operation, LogEntry.protoPayload
	// are not parsed
	// Service Control LogEntry does not contain: traceId, SpanId, traceSampled

	logBodyString := logRecord.Body().AsString()
	if len(logBodyString) == 0 {
		return entry, nil
	}

	// Service Control LogEntry representation does not support
	// splits. In FluentBit, long log entries are dropped.
	overheadBytes := proto.Size(entry)
	if (len([]byte(logBodyString)) + overheadBytes) > l.maxEntrySize {
		return nil, fmt.Errorf("entry size is too big: got: %d bytes, want: < %d bytes; timestamp: %s",
			len([]byte(logBodyString))+overheadBytes,
			l.maxEntrySize,
			entry.Timestamp)
	}
	entry.Payload = &scpb.LogEntry_TextPayload{TextPayload: logBodyString}

	return entry, nil
}