func()

in exporter/collector/logs.go [403:572]


func (l logMapper) logToSplitEntries(
	logRecord plog.LogRecord,
	mr *monitoredrespb.MonitoredResource,
	logLabels map[string]string,
	processTime time.Time,
	logName string,
	projectID string,
) ([]*logpb.LogEntry, error) {
	ts := logRecord.Timestamp().AsTime()
	if logRecord.Timestamp() == 0 || ts.IsZero() {
		// if timestamp is unset, fall back to observed_time_unix_nano as recommended
		//   (see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179)
		if logRecord.ObservedTimestamp() != 0 {
			ts = logRecord.ObservedTimestamp().AsTime()
		} else {
			// if observed_time is 0, use the current time
			ts = processTime
		}
	}

	entry := &logpb.LogEntry{
		Resource:  mr,
		Timestamp: timestamppb.New(ts),
		Labels:    logLabels,
		LogName:   fmt.Sprintf("projects/%s/logs/%s", projectID, url.PathEscape(logName)),
	}

	// build our own map off OTel attributes so we don't have to call .Get() for each special case
	// (.Get() ranges over all attributes each time)
	attrsMap := make(map[string]pcommon.Value)
	logRecord.Attributes().Range(func(k string, v pcommon.Value) bool {
		attrsMap[k] = v
		return true
	})

	// parse LogEntrySourceLocation struct from OTel attribute
	if sourceLocation, ok := attrsMap[SourceLocationAttributeKey]; ok {
		var logEntrySourceLocation logpb.LogEntrySourceLocation
		err := unmarshalAttribute(sourceLocation, &logEntrySourceLocation)
		if err != nil {
			return nil, &attributeProcessingError{Key: SourceLocationAttributeKey, Err: err}
		}
		entry.SourceLocation = &logEntrySourceLocation
		delete(attrsMap, SourceLocationAttributeKey)
	}

	// parse TraceSampled boolean from OTel attribute or IsSampled OTLP flag
	if traceSampled, ok := attrsMap[TraceSampledAttributeKey]; ok || logRecord.Flags().IsSampled() {
		entry.TraceSampled = (traceSampled.Bool() || logRecord.Flags().IsSampled())
		delete(attrsMap, TraceSampledAttributeKey)
	}

	// parse TraceID and SpanID, if present
	if traceID := logRecord.TraceID(); !traceID.IsEmpty() {
		entry.Trace = fmt.Sprintf("projects/%s/traces/%s", projectID, hex.EncodeToString(traceID[:]))
	}
	if spanID := logRecord.SpanID(); !spanID.IsEmpty() {
		entry.SpanId = hex.EncodeToString(spanID[:])
	}

	if httpRequestAttr, ok := attrsMap[HTTPRequestAttributeKey]; ok {
		httpRequest, err := l.parseHTTPRequest(httpRequestAttr)
		if err != nil {
			l.obs.log.Debug("Unable to parse httpRequest", zap.Error(err))
		}
		entry.HttpRequest = httpRequest
		delete(attrsMap, HTTPRequestAttributeKey)
	}

	if logRecord.SeverityNumber() < 0 || int(logRecord.SeverityNumber()) > len(severityMapping)-1 {
		return nil, fmt.Errorf("unknown SeverityNumber %v", logRecord.SeverityNumber())
	}
	severityNumber := logRecord.SeverityNumber()
	// Log severity levels are based on numerical values defined by Otel/GCP, which are informally mapped to generic text values such as "ALERT", "Debug", etc.
	// In some cases, a SeverityText value can be automatically mapped to a matching SeverityNumber.
	// If not (for example, when directly setting the SeverityText on a Log entry with the Transform processor), then the
	// SeverityText might be something like "ALERT" while the SeverityNumber is still "0".
	// In this case, we will attempt to map the text ourselves to one of the defined Otel SeverityNumbers.
	// We do this by checking that the SeverityText is NOT "default" (ie, it exists in our map) and that the SeverityNumber IS "0".
	// (This also excludes other unknown/custom severity text values, which may have user-defined mappings in the collector)
	if severityForText, ok := otelSeverityForText[strings.ToLower(logRecord.SeverityText())]; ok && severityNumber == 0 {
		severityNumber = severityForText
	}
	entry.Severity = severityMapping[severityNumber]

	// Parse severityNumber > 17 (error) to a GCP Error Reporting entry if enabled
	if severityNumber >= 17 && l.cfg.LogConfig.ErrorReportingType {
		if logRecord.Body().Type() != pcommon.ValueTypeMap {
			strValue := logRecord.Body().AsString()
			logRecord.Body().SetEmptyMap()
			logRecord.Body().Map().PutStr("message", strValue)
		}
		logRecord.Body().Map().PutStr(GCPTypeKey, GCPErrorReportingTypeValue)
	}

	// parse remaining OTel attributes to GCP labels
	for k, v := range attrsMap {
		// skip "gcp.*" attributes since we process these to other fields
		if strings.HasPrefix(k, "gcp.") {
			continue
		}
		if _, ok := entry.Labels[k]; !ok {
			entry.Labels[k] = v.AsString()
		}
	}

	// Handle map and bytes as JSON-structured logs if they are successfully converted.
	switch logRecord.Body().Type() {
	case pcommon.ValueTypeMap:
		s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw())
		if err == nil {
			entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
			return []*logpb.LogEntry{entry}, nil
		}
		l.obs.log.Warn(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err))
	case pcommon.ValueTypeBytes:
		s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
		if err == nil {
			entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
			return []*logpb.LogEntry{entry}, nil
		}
		l.obs.log.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err))
	}
	// For all other ValueTypes, export as a string payload.

	// log.Body().AsString() can be expensive, and we use it several times below this, so
	// do it once and save that as a variable.
	logBodyString := logRecord.Body().AsString()
	if len(logBodyString) == 0 {
		return []*logpb.LogEntry{entry}, nil
	}

	// Calculate the size of the internal log entry so this overhead can be accounted
	// for when determining the need to split based on payload size
	// TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well
	overheadBytes := proto.Size(entry)
	// Split log entries with a string payload into fewer entries
	splits := int(math.Ceil(float64(len([]byte(logBodyString))) / float64(l.maxEntrySize-overheadBytes)))
	if splits <= 1 {
		entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: logBodyString}
		return []*logpb.LogEntry{entry}, nil
	}
	entries := make([]*logpb.LogEntry, splits)
	// Start by assuming all splits will be even (this may not be the case)
	startIndex := 0
	endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(logBodyString))))
	for i := 0; i < splits; i++ {
		newEntry := proto.Clone(entry).(*logpb.LogEntry)
		currentSplit := logBodyString[startIndex:endIndex]

		// If the current split is larger than the entry size, iterate until it is within the max
		// (This may happen since not all characters are exactly 1 byte)
		for len([]byte(currentSplit)) > l.maxEntrySize {
			endIndex--
			currentSplit = logBodyString[startIndex:endIndex]
		}
		newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit}
		newEntry.Split = &logpb.LogSplit{
			Uid:         fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()),
			Index:       int32(i),
			TotalSplits: int32(splits),
		}
		entries[i] = newEntry

		// Update slice indices to the next chunk
		startIndex = endIndex
		endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(logBodyString))))
	}
	return entries, nil
}