in components/google-built-opentelemetry-collector/exporter/googleservicecontrolexporter/logs.go [254:385]
func (l logMapper) parseLogEntry(logRecord plog.LogRecord, processTime time.Time) (*scpb.LogEntry, error) {
ts := logRecord.Timestamp().AsTime()
if logRecord.Timestamp() == 0 || ts.IsZero() {
// if timestamp is unset, fall back to observed_time_unix_nano as recommended
// (see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179)
if logRecord.ObservedTimestamp() != 0 {
ts = logRecord.ObservedTimestamp().AsTime()
} else {
// if observed_time is 0, use the process time, which is the current time
ts = processTime
}
}
logName, err := l.getLogName(logRecord)
if err != nil {
return nil, err
}
entry := &scpb.LogEntry{
Name: logName,
Timestamp: timestamppb.New(ts),
Labels: map[string]string{},
}
// build our own map off OTel attributes so we don't have to call .Get() for each special case
// (.Get() ranges over all attributes each time)
attrsMap := make(map[string]pcommon.Value)
logRecord.Attributes().Range(func(k string, v pcommon.Value) bool {
attrsMap[k] = v
return true
})
// Parse LogEntry InsertId struct from OTel attribute
// TODO(lujieduan): we should evaluate if we need to parse insertId from
// the logs: the FluentBit plugin parses this but Otel Cloud Logging
// exporter does not and let API assigns the InsertId
if insertIdAttr, ok := attrsMap[InsertIdAttributeKey]; ok {
entry.InsertId = insertIdAttr.AsString()
delete(attrsMap, InsertIdAttributeKey)
}
// When insertId is not present in the attributes, FluentBit would generate
// UUIDs in the exporter; here we would just leave it blank and let server
// assigns new UUIDs - same end results, easy for testing
// parse LogEntrySourceLocation struct from OTel attribute
if sourceLocation, ok := attrsMap[SourceLocationAttributeKey]; ok {
var logEntrySourceLocation scpb.LogEntrySourceLocation
err := unmarshalAttribute(sourceLocation, &logEntrySourceLocation)
if err != nil {
return nil, &attributeProcessingError{Key: SourceLocationAttributeKey, Err: err}
}
entry.SourceLocation = &logEntrySourceLocation
delete(attrsMap, SourceLocationAttributeKey)
}
// parse HttpRequest
if httpRequestAttr, ok := attrsMap[HTTPRequestAttributeKey]; ok {
httpRequest, err := l.parseHTTPRequest(httpRequestAttr)
if err != nil {
l.logger.Warn("Unable to parse httpRequest", zap.Error(err))
}
entry.HttpRequest = httpRequest
delete(attrsMap, HTTPRequestAttributeKey)
}
// parse Severity
severityNumber := logRecord.SeverityNumber()
// Log severity levels are based on numerical values defined by Otel/GCP, which are informally mapped to generic text values such as "ALERT", "Debug", etc.
// In some cases, a SeverityText value can be automatically mapped to a matching SeverityNumber.
// If not (for example, when directly setting the SeverityText on a Log entry with the Transform processor), then the
// SeverityText might be something like "ALERT" while the SeverityNumber is still "0".
// In this case, we will attempt to map the text ourselves to one of the defined Otel SeverityNumbers.
// We do this by checking that the SeverityText is NOT "default" (ie, it exists in our map) and that the SeverityNumber IS "0".
// (This also excludes other unknown/custom severity text values, which may have user-defined mappings in the collector)
if severityForText, ok := otelSeverityForText[strings.ToLower(logRecord.SeverityText())]; ok && severityNumber == 0 {
severityNumber = severityForText
}
entry.Severity, err = severityMapping(severityNumber)
if err != nil {
l.logger.Warn(fmt.Errorf("error parsing severity %v with error: %s", logRecord.SeverityNumber(), err))
}
// parse remaining OTel attributes to GCP labels
for k, v := range attrsMap {
if k == LogNameAttributeKey {
continue
}
if _, ok := entry.Labels[k]; !ok {
entry.Labels[k] = v.AsString()
}
}
// Handle map and bytes as JSON-structured logs if they are successfully converted.
switch logRecord.Body().Type() {
case pcommon.ValueTypeMap:
s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw())
if err == nil {
entry.Payload = &scpb.LogEntry_StructPayload{StructPayload: s}
return entry, nil
}
l.logger.Debug(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err))
case pcommon.ValueTypeBytes:
s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
if err == nil {
entry.Payload = &scpb.LogEntry_StructPayload{StructPayload: s}
return entry, nil
}
l.logger.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err))
}
// Fields: LogEntry.trace, LogEntry.operation, LogEntry.protoPayload
// are not parsed
// Service Control LogEntry does not contain: traceId, SpanId, traceSampled
logBodyString := logRecord.Body().AsString()
if len(logBodyString) == 0 {
return entry, nil
}
// Service Control LogEntry representation does not support
// splits. In FluentBit, long log entries are dropped.
overheadBytes := proto.Size(entry)
if (len([]byte(logBodyString)) + overheadBytes) > l.maxEntrySize {
return nil, fmt.Errorf("entry size is too big: got: %d bytes, want: < %d bytes; timestamp: %s",
len([]byte(logBodyString))+overheadBytes,
l.maxEntrySize,
entry.Timestamp)
}
entry.Payload = &scpb.LogEntry_TextPayload{TextPayload: logBodyString}
return entry, nil
}