components/google-built-opentelemetry-collector/exporter/googleservicecontrolexporter/logs.go (362 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package googleservicecontrolexporter import ( "bytes" "context" "encoding/json" "errors" "fmt" "strings" "time" "unicode/utf8" scpb "cloud.google.com/go/servicecontrol/apiv1/servicecontrolpb" "github.com/pborman/uuid" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" logtypepb "google.golang.org/genproto/googleapis/logging/type" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" ) const ( defaultMaxEntrySize = 256 * 1024 // 256 KB LogNameAttributeKey = "log_name" // TODO: we should evaluate if this is needed; if not, can remove this and // let the API generate a UUID - same behavior as the Cloud Logging exporter InsertIdAttributeKey = "logging.googleapis.com/insertId" // FluentBit exporter uses logging.googleapis.com/* prefix and Cloud Logging // exporter uses `gcp.*` prefix SourceLocationAttributeKey = "logging.googleapis.com/sourceLocation" HTTPRequestAttributeKey = "httpRequest" LogDefaultOperationName = "log_entry" ) // severityMapping maps the integer severity level values from OTel [0-24] // to matching Cloud Logging severity levels. // Service Control' severity uses logtypepb's severity levels, so this mapping // is exactly the same as Cloud Logging exporter's severity mapping. func severityMapping(severityNumber plog.SeverityNumber) (logtypepb.LogSeverity, error) { switch { case severityNumber == 0: return logtypepb.LogSeverity_DEFAULT, nil case 1 <= severityNumber && severityNumber <= 8: return logtypepb.LogSeverity_DEBUG, nil case 9 <= severityNumber && severityNumber <= 10: return logtypepb.LogSeverity_INFO, nil case 11 <= severityNumber && severityNumber <= 12: return logtypepb.LogSeverity_NOTICE, nil case 13 <= severityNumber && severityNumber <= 16: return logtypepb.LogSeverity_WARNING, nil case 17 <= severityNumber && severityNumber <= 20: return logtypepb.LogSeverity_ERROR, nil case 21 <= severityNumber && severityNumber <= 22: return logtypepb.LogSeverity_CRITICAL, nil case severityNumber == 23: return logtypepb.LogSeverity_ALERT, nil case severityNumber == 24: return logtypepb.LogSeverity_EMERGENCY, nil default: return logtypepb.LogSeverity_DEFAULT, fmt.Errorf("unknown severity number %d", severityNumber) } } // otelSeverityForText maps the generic aliases of SeverityTexts to SeverityNumbers. // This can be useful if SeverityText is manually set to one of the values from the data // model in a way that doesn't automatically parse the SeverityNumber as well // (see https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/issues/442) // Otherwise, this is the mapping that is automatically used by the Stanza log severity parser // (https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.54.0/pkg/stanza/operator/helper/severity_builder.go#L34-L57) var otelSeverityForText = map[string]plog.SeverityNumber{ "trace": plog.SeverityNumberTrace, "trace2": plog.SeverityNumberTrace2, "trace3": plog.SeverityNumberTrace3, "trace4": plog.SeverityNumberTrace4, "debug": plog.SeverityNumberDebug, "debug2": plog.SeverityNumberDebug2, "debug3": plog.SeverityNumberDebug3, "debug4": plog.SeverityNumberDebug4, "info": plog.SeverityNumberInfo, "info2": plog.SeverityNumberInfo2, "info3": plog.SeverityNumberInfo3, "info4": plog.SeverityNumberInfo4, "warn": plog.SeverityNumberWarn, "warn2": plog.SeverityNumberWarn2, "warn3": plog.SeverityNumberWarn3, "warn4": plog.SeverityNumberWarn4, "error": plog.SeverityNumberError, "error2": plog.SeverityNumberError2, "error3": plog.SeverityNumberError3, "error4": plog.SeverityNumberError4, "fatal": plog.SeverityNumberFatal, "fatal2": plog.SeverityNumberFatal2, "fatal3": plog.SeverityNumberFatal3, "fatal4": plog.SeverityNumberFatal4, } type attributeProcessingError struct { Err error Key string } func (e *attributeProcessingError) Error() string { return fmt.Sprintf("could not process attribute %s: %s", e.Key, e.Err.Error()) } type unsupportedValueTypeError struct { ValueType pcommon.ValueType } func (e *unsupportedValueTypeError) Error() string { return fmt.Sprintf("unsupported value type %v", e.ValueType) } type LogsExporter struct { *Exporter logMapper logMapper } type logMapper struct { logger *zap.SugaredLogger cfg Config maxEntrySize int } // NewLogsExporter returns service control logs exporter func NewLogsExporter(config Config, logger *zap.Logger, c ServiceControlClient, tel component.TelemetrySettings) *LogsExporter { e := newExporter(config, logger, c, tel) return &LogsExporter{ Exporter: e, logMapper: logMapper{ logger: logger.Sugar(), cfg: config, maxEntrySize: defaultMaxEntrySize, }, } } func (e *LogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { req, err := e.createReportRequest(ld) if err != nil { return err } if len(req.Operations) == 0 { // Nothing to export. return nil } return e.pushReportRequest(ctx, req) } func (e *LogsExporter) createReportRequest(ld plog.Logs) (*scpb.ReportRequest, error) { now := time.Now() request := scpb.ReportRequest{ Operations: make([]*scpb.Operation, 0), ServiceConfigId: e.serviceConfigID, ServiceName: e.serviceName, } //TODO(lujieduan): in FluentBit, the exporter would create a monitored // resources to log entries map, and create one "Operation" to contain all // entries of each MR. // This could be problematic if there are too many logs in one batch, either // within a single MR, or across all MRs, causing the request to be larger // than the API size limit. // Instead, we should look into split them into separate requests for i := range ld.ResourceLogs().Len() { rl := ld.ResourceLogs().At(i) ops, err := e.createRequestOperation(rl, now) if err != nil { return nil, err } request.Operations = append(request.Operations, ops) } return &request, nil } func (e *LogsExporter) createRequestOperation(rl plog.ResourceLogs, now time.Time) (*scpb.Operation, error) { le, mr, consumerId, err := e.createEntries(rl) if err != nil { return nil, err } op := scpb.Operation{ ConsumerId: consumerId, OperationName: e.logMapper.cfg.LogConfig.OperationName, // Ensure start_time < end_time: // https://yaqs.corp.google.com/eng/q/5422158029493633024. // Keep start_time = now - 1ms. StartTime: timestamppb.New(now.Add(-1 * time.Second)), EndTime: timestamppb.New(now), OperationId: uuid.New(), Labels: mr, LogEntries: le, } return &op, nil } func (e *LogsExporter) createEntries(rl plog.ResourceLogs) ([]*scpb.LogEntry, map[string]string, string, error) { var errs []error logCount := rl.ScopeLogs().Len() entries := make([]*scpb.LogEntry, 0, logCount) processTime := time.Now() resourceAttributes, consumerId := e.parseResourceAttributes(rl.Resource()) for j := range logCount { sl := rl.ScopeLogs().At(j) // TODO(lujieduan): handle otel instrumentation scope labels, i.e., // instrumentation_source and instrumentation_version for k := range sl.LogRecords().Len() { logRecord := sl.LogRecords().At(k) entry, err := e.logMapper.parseLogEntry(logRecord, processTime) if err != nil { errs = append(errs, err) continue } entries = append(entries, entry) } } return entries, resourceAttributes, consumerId, errors.Join(errs...) } func (l logMapper) getLogName(log plog.LogRecord) (string, error) { logNameAttr, exists := log.Attributes().Get(LogNameAttributeKey) if exists { return logNameAttr.AsString(), nil } if len(l.cfg.LogConfig.DefaultLogName) > 0 { return l.cfg.LogConfig.DefaultLogName, nil } return "", fmt.Errorf("encountered log without 'log_name' field, while 'default_log_name' value in configuration is empty") } // parseLogEntry creates a Service Control LogEntry from otel LogRecord. Service // Control API does not support log splits, so one otel log will always be one // log entry. func (l logMapper) parseLogEntry(logRecord plog.LogRecord, processTime time.Time) (*scpb.LogEntry, error) { ts := logRecord.Timestamp().AsTime() if logRecord.Timestamp() == 0 || ts.IsZero() { // if timestamp is unset, fall back to observed_time_unix_nano as recommended // (see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179) if logRecord.ObservedTimestamp() != 0 { ts = logRecord.ObservedTimestamp().AsTime() } else { // if observed_time is 0, use the process time, which is the current time ts = processTime } } logName, err := l.getLogName(logRecord) if err != nil { return nil, err } entry := &scpb.LogEntry{ Name: logName, Timestamp: timestamppb.New(ts), Labels: map[string]string{}, } // build our own map off OTel attributes so we don't have to call .Get() for each special case // (.Get() ranges over all attributes each time) attrsMap := make(map[string]pcommon.Value) logRecord.Attributes().Range(func(k string, v pcommon.Value) bool { attrsMap[k] = v return true }) // Parse LogEntry InsertId struct from OTel attribute // TODO(lujieduan): we should evaluate if we need to parse insertId from // the logs: the FluentBit plugin parses this but Otel Cloud Logging // exporter does not and let API assigns the InsertId if insertIdAttr, ok := attrsMap[InsertIdAttributeKey]; ok { entry.InsertId = insertIdAttr.AsString() delete(attrsMap, InsertIdAttributeKey) } // When insertId is not present in the attributes, FluentBit would generate // UUIDs in the exporter; here we would just leave it blank and let server // assigns new UUIDs - same end results, easy for testing // parse LogEntrySourceLocation struct from OTel attribute if sourceLocation, ok := attrsMap[SourceLocationAttributeKey]; ok { var logEntrySourceLocation scpb.LogEntrySourceLocation err := unmarshalAttribute(sourceLocation, &logEntrySourceLocation) if err != nil { return nil, &attributeProcessingError{Key: SourceLocationAttributeKey, Err: err} } entry.SourceLocation = &logEntrySourceLocation delete(attrsMap, SourceLocationAttributeKey) } // parse HttpRequest if httpRequestAttr, ok := attrsMap[HTTPRequestAttributeKey]; ok { httpRequest, err := l.parseHTTPRequest(httpRequestAttr) if err != nil { l.logger.Warn("Unable to parse httpRequest", zap.Error(err)) } entry.HttpRequest = httpRequest delete(attrsMap, HTTPRequestAttributeKey) } // parse Severity severityNumber := logRecord.SeverityNumber() // Log severity levels are based on numerical values defined by Otel/GCP, which are informally mapped to generic text values such as "ALERT", "Debug", etc. // In some cases, a SeverityText value can be automatically mapped to a matching SeverityNumber. // If not (for example, when directly setting the SeverityText on a Log entry with the Transform processor), then the // SeverityText might be something like "ALERT" while the SeverityNumber is still "0". // In this case, we will attempt to map the text ourselves to one of the defined Otel SeverityNumbers. // We do this by checking that the SeverityText is NOT "default" (ie, it exists in our map) and that the SeverityNumber IS "0". // (This also excludes other unknown/custom severity text values, which may have user-defined mappings in the collector) if severityForText, ok := otelSeverityForText[strings.ToLower(logRecord.SeverityText())]; ok && severityNumber == 0 { severityNumber = severityForText } entry.Severity, err = severityMapping(severityNumber) if err != nil { l.logger.Warn(fmt.Errorf("error parsing severity %v with error: %s", logRecord.SeverityNumber(), err)) } // parse remaining OTel attributes to GCP labels for k, v := range attrsMap { if k == LogNameAttributeKey { continue } if _, ok := entry.Labels[k]; !ok { entry.Labels[k] = v.AsString() } } // Handle map and bytes as JSON-structured logs if they are successfully converted. switch logRecord.Body().Type() { case pcommon.ValueTypeMap: s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw()) if err == nil { entry.Payload = &scpb.LogEntry_StructPayload{StructPayload: s} return entry, nil } l.logger.Debug(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err)) case pcommon.ValueTypeBytes: s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw()) if err == nil { entry.Payload = &scpb.LogEntry_StructPayload{StructPayload: s} return entry, nil } l.logger.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err)) } // Fields: LogEntry.trace, LogEntry.operation, LogEntry.protoPayload // are not parsed // Service Control LogEntry does not contain: traceId, SpanId, traceSampled logBodyString := logRecord.Body().AsString() if len(logBodyString) == 0 { return entry, nil } // Service Control LogEntry representation does not support // splits. In FluentBit, long log entries are dropped. overheadBytes := proto.Size(entry) if (len([]byte(logBodyString)) + overheadBytes) > l.maxEntrySize { return nil, fmt.Errorf("entry size is too big: got: %d bytes, want: < %d bytes; timestamp: %s", len([]byte(logBodyString))+overheadBytes, l.maxEntrySize, entry.Timestamp) } entry.Payload = &scpb.LogEntry_TextPayload{TextPayload: logBodyString} return entry, nil } // JSON keys derived from: // https://cloud.google.com/service-infrastructure/docs/service-control/reference/rest/v1/Operation#httprequest type httpRequestLog struct { RemoteIP string `json:"remoteIp"` RequestURL string `json:"requestUrl"` Latency string `json:"latency"` Referer string `json:"referer"` ServerIP string `json:"serverIp"` UserAgent string `json:"userAgent"` RequestMethod string `json:"requestMethod"` Protocol string `json:"protocol"` ResponseSize int64 `json:"responseSize,string"` RequestSize int64 `json:"requestSize,string"` CacheFillBytes int64 `json:"cacheFillBytes,string"` Status int32 `json:"status,string"` CacheLookup bool `json:"cacheLookup"` CacheHit bool `json:"cacheHit"` CacheValidatedWithOriginServer bool `json:"cacheValidatedWithOriginServer"` } func (l logMapper) parseHTTPRequest(httpRequestAttr pcommon.Value) (*scpb.HttpRequest, error) { var parsedHTTPRequest httpRequestLog err := unmarshalAttribute(httpRequestAttr, &parsedHTTPRequest) if err != nil { return nil, &attributeProcessingError{Key: HTTPRequestAttributeKey, Err: err} } pb := &scpb.HttpRequest{ RequestMethod: parsedHTTPRequest.RequestMethod, RequestUrl: fixUTF8(parsedHTTPRequest.RequestURL), RequestSize: parsedHTTPRequest.RequestSize, Status: parsedHTTPRequest.Status, ResponseSize: parsedHTTPRequest.ResponseSize, UserAgent: parsedHTTPRequest.UserAgent, ServerIp: parsedHTTPRequest.ServerIP, RemoteIp: parsedHTTPRequest.RemoteIP, Referer: parsedHTTPRequest.Referer, CacheHit: parsedHTTPRequest.CacheHit, CacheValidatedWithOriginServer: parsedHTTPRequest.CacheValidatedWithOriginServer, Protocol: parsedHTTPRequest.Protocol, CacheFillBytes: parsedHTTPRequest.CacheFillBytes, CacheLookup: parsedHTTPRequest.CacheLookup, } if parsedHTTPRequest.Latency != "" { latency, err := time.ParseDuration(parsedHTTPRequest.Latency) if err == nil && latency != 0 { pb.Latency = durationpb.New(latency) } } return pb, nil } // toProtoStruct converts v, which must marshal into a JSON object, // into a Google Struct proto. // Mostly copied from // https://github.com/googleapis/google-cloud-go/blob/69705144832c715cf23832602ad9338b911dff9a/logging/logging.go#L577 func toProtoStruct(v any) (*structpb.Struct, error) { // v is a Go value that supports JSON marshaling. We want a Struct // protobuf. Some day we may have a more direct way to get there, but right // now the only way is to marshal the Go value to JSON, unmarshal into a // map, and then build the Struct proto from the map. jb, err := json.Marshal(v) if err != nil { return nil, fmt.Errorf("logging: json.Marshal: %w", err) } var m map[string]any err = json.Unmarshal(jb, &m) if err != nil { return nil, fmt.Errorf("logging: json.Unmarshal: %w", err) } return structpb.NewStruct(m) } // fixUTF8 is a helper that fixes an invalid UTF-8 string by replacing // invalid UTF-8 runes with the Unicode replacement character (U+FFFD). // See Issue https://github.com/googleapis/google-cloud-go/issues/1383. // Coped from https://github.com/googleapis/google-cloud-go/blob/69705144832c715cf23832602ad9338b911dff9a/logging/logging.go#L557 func fixUTF8(s string) string { if utf8.ValidString(s) { return s } // Otherwise time to build the sequence. buf := new(bytes.Buffer) buf.Grow(len(s)) for _, r := range s { if utf8.ValidRune(r) { buf.WriteRune(r) } else { buf.WriteRune('\uFFFD') } } return buf.String() } func unmarshalAttribute(v pcommon.Value, out any) error { var valueBytes []byte switch v.Type() { case pcommon.ValueTypeBytes: valueBytes = v.Bytes().AsRaw() case pcommon.ValueTypeMap, pcommon.ValueTypeStr: valueBytes = []byte(v.AsString()) default: return &unsupportedValueTypeError{ValueType: v.Type()} } // TODO: Investigate doing this without the JSON unmarshal. Getting the attribute as a map // instead of a slice of bytes could do, but would need a lot of type casting and checking // assertions with it. return json.Unmarshal(valueBytes, out) }