in exporter/collector/logs.go [403:572]
func (l logMapper) logToSplitEntries(
logRecord plog.LogRecord,
mr *monitoredrespb.MonitoredResource,
logLabels map[string]string,
processTime time.Time,
logName string,
projectID string,
) ([]*logpb.LogEntry, error) {
ts := logRecord.Timestamp().AsTime()
if logRecord.Timestamp() == 0 || ts.IsZero() {
// if timestamp is unset, fall back to observed_time_unix_nano as recommended
// (see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179)
if logRecord.ObservedTimestamp() != 0 {
ts = logRecord.ObservedTimestamp().AsTime()
} else {
// if observed_time is 0, use the current time
ts = processTime
}
}
entry := &logpb.LogEntry{
Resource: mr,
Timestamp: timestamppb.New(ts),
Labels: logLabels,
LogName: fmt.Sprintf("projects/%s/logs/%s", projectID, url.PathEscape(logName)),
}
// build our own map off OTel attributes so we don't have to call .Get() for each special case
// (.Get() ranges over all attributes each time)
attrsMap := make(map[string]pcommon.Value)
logRecord.Attributes().Range(func(k string, v pcommon.Value) bool {
attrsMap[k] = v
return true
})
// parse LogEntrySourceLocation struct from OTel attribute
if sourceLocation, ok := attrsMap[SourceLocationAttributeKey]; ok {
var logEntrySourceLocation logpb.LogEntrySourceLocation
err := unmarshalAttribute(sourceLocation, &logEntrySourceLocation)
if err != nil {
return nil, &attributeProcessingError{Key: SourceLocationAttributeKey, Err: err}
}
entry.SourceLocation = &logEntrySourceLocation
delete(attrsMap, SourceLocationAttributeKey)
}
// parse TraceSampled boolean from OTel attribute or IsSampled OTLP flag
if traceSampled, ok := attrsMap[TraceSampledAttributeKey]; ok || logRecord.Flags().IsSampled() {
entry.TraceSampled = (traceSampled.Bool() || logRecord.Flags().IsSampled())
delete(attrsMap, TraceSampledAttributeKey)
}
// parse TraceID and SpanID, if present
if traceID := logRecord.TraceID(); !traceID.IsEmpty() {
entry.Trace = fmt.Sprintf("projects/%s/traces/%s", projectID, hex.EncodeToString(traceID[:]))
}
if spanID := logRecord.SpanID(); !spanID.IsEmpty() {
entry.SpanId = hex.EncodeToString(spanID[:])
}
if httpRequestAttr, ok := attrsMap[HTTPRequestAttributeKey]; ok {
httpRequest, err := l.parseHTTPRequest(httpRequestAttr)
if err != nil {
l.obs.log.Debug("Unable to parse httpRequest", zap.Error(err))
}
entry.HttpRequest = httpRequest
delete(attrsMap, HTTPRequestAttributeKey)
}
if logRecord.SeverityNumber() < 0 || int(logRecord.SeverityNumber()) > len(severityMapping)-1 {
return nil, fmt.Errorf("unknown SeverityNumber %v", logRecord.SeverityNumber())
}
severityNumber := logRecord.SeverityNumber()
// Log severity levels are based on numerical values defined by Otel/GCP, which are informally mapped to generic text values such as "ALERT", "Debug", etc.
// In some cases, a SeverityText value can be automatically mapped to a matching SeverityNumber.
// If not (for example, when directly setting the SeverityText on a Log entry with the Transform processor), then the
// SeverityText might be something like "ALERT" while the SeverityNumber is still "0".
// In this case, we will attempt to map the text ourselves to one of the defined Otel SeverityNumbers.
// We do this by checking that the SeverityText is NOT "default" (ie, it exists in our map) and that the SeverityNumber IS "0".
// (This also excludes other unknown/custom severity text values, which may have user-defined mappings in the collector)
if severityForText, ok := otelSeverityForText[strings.ToLower(logRecord.SeverityText())]; ok && severityNumber == 0 {
severityNumber = severityForText
}
entry.Severity = severityMapping[severityNumber]
// Parse severityNumber > 17 (error) to a GCP Error Reporting entry if enabled
if severityNumber >= 17 && l.cfg.LogConfig.ErrorReportingType {
if logRecord.Body().Type() != pcommon.ValueTypeMap {
strValue := logRecord.Body().AsString()
logRecord.Body().SetEmptyMap()
logRecord.Body().Map().PutStr("message", strValue)
}
logRecord.Body().Map().PutStr(GCPTypeKey, GCPErrorReportingTypeValue)
}
// parse remaining OTel attributes to GCP labels
for k, v := range attrsMap {
// skip "gcp.*" attributes since we process these to other fields
if strings.HasPrefix(k, "gcp.") {
continue
}
if _, ok := entry.Labels[k]; !ok {
entry.Labels[k] = v.AsString()
}
}
// Handle map and bytes as JSON-structured logs if they are successfully converted.
switch logRecord.Body().Type() {
case pcommon.ValueTypeMap:
s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw())
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
return []*logpb.LogEntry{entry}, nil
}
l.obs.log.Warn(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err))
case pcommon.ValueTypeBytes:
s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
return []*logpb.LogEntry{entry}, nil
}
l.obs.log.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err))
}
// For all other ValueTypes, export as a string payload.
// log.Body().AsString() can be expensive, and we use it several times below this, so
// do it once and save that as a variable.
logBodyString := logRecord.Body().AsString()
if len(logBodyString) == 0 {
return []*logpb.LogEntry{entry}, nil
}
// Calculate the size of the internal log entry so this overhead can be accounted
// for when determining the need to split based on payload size
// TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well
overheadBytes := proto.Size(entry)
// Split log entries with a string payload into fewer entries
splits := int(math.Ceil(float64(len([]byte(logBodyString))) / float64(l.maxEntrySize-overheadBytes)))
if splits <= 1 {
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: logBodyString}
return []*logpb.LogEntry{entry}, nil
}
entries := make([]*logpb.LogEntry, splits)
// Start by assuming all splits will be even (this may not be the case)
startIndex := 0
endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(logBodyString))))
for i := 0; i < splits; i++ {
newEntry := proto.Clone(entry).(*logpb.LogEntry)
currentSplit := logBodyString[startIndex:endIndex]
// If the current split is larger than the entry size, iterate until it is within the max
// (This may happen since not all characters are exactly 1 byte)
for len([]byte(currentSplit)) > l.maxEntrySize {
endIndex--
currentSplit = logBodyString[startIndex:endIndex]
}
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit}
newEntry.Split = &logpb.LogSplit{
Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()),
Index: int32(i),
TotalSplits: int32(splits),
}
entries[i] = newEntry
// Update slice indices to the next chunk
startIndex = endIndex
endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(logBodyString))))
}
return entries, nil
}