exporter/collector/logs.go (524 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"net/url"
"strings"
"time"
"unicode/utf8"
loggingv2 "cloud.google.com/go/logging/apiv2"
logpb "cloud.google.com/go/logging/apiv2/loggingpb"
"github.com/googleapis/gax-go/v2"
"go.uber.org/zap"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
logtypepb "google.golang.org/genproto/googleapis/logging/type"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/logsutil"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping"
)
const (
defaultMaxEntrySize = 256000 // 256 KB
defaultMaxRequestSize = 10000000 // 10 MB
HTTPRequestAttributeKey = "gcp.http_request"
LogNameAttributeKey = "gcp.log_name"
SourceLocationAttributeKey = "gcp.source_location"
TraceSampledAttributeKey = "gcp.trace_sampled"
GCPTypeKey = "@type"
GCPErrorReportingTypeValue = "type.googleapis.com/google.devtools.clouderrorreporting.v1beta1.ReportedErrorEvent"
)
// severityMapping maps the integer severity level values from OTel [0-24]
// to matching Cloud Logging severity levels.
var severityMapping = []logtypepb.LogSeverity{
logtypepb.LogSeverity_DEFAULT, // Default, 0
logtypepb.LogSeverity_DEBUG, //
logtypepb.LogSeverity_DEBUG, //
logtypepb.LogSeverity_DEBUG, //
logtypepb.LogSeverity_DEBUG, //
logtypepb.LogSeverity_DEBUG, //
logtypepb.LogSeverity_DEBUG, //
logtypepb.LogSeverity_DEBUG, //
logtypepb.LogSeverity_DEBUG, // 1-8 -> Debug
logtypepb.LogSeverity_INFO, //
logtypepb.LogSeverity_INFO, // 9-10 -> Info
logtypepb.LogSeverity_NOTICE, //
logtypepb.LogSeverity_NOTICE, // 11-12 -> Notice
logtypepb.LogSeverity_WARNING, //
logtypepb.LogSeverity_WARNING, //
logtypepb.LogSeverity_WARNING, //
logtypepb.LogSeverity_WARNING, // 13-16 -> Warning
logtypepb.LogSeverity_ERROR, //
logtypepb.LogSeverity_ERROR, //
logtypepb.LogSeverity_ERROR, //
logtypepb.LogSeverity_ERROR, // 17-20 -> Error
logtypepb.LogSeverity_CRITICAL, //
logtypepb.LogSeverity_CRITICAL, // 21-22 -> Critical
logtypepb.LogSeverity_ALERT, // 23 -> Alert
logtypepb.LogSeverity_EMERGENCY, // 24 -> Emergency
}
// otelSeverityForText maps the generic aliases of SeverityTexts to SeverityNumbers.
// This can be useful if SeverityText is manually set to one of the values from the data
// model in a way that doesn't automatically parse the SeverityNumber as well
// (see https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/issues/442)
// Otherwise, this is the mapping that is automatically used by the Stanza log severity parser
// (https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.54.0/pkg/stanza/operator/helper/severity_builder.go#L34-L57)
var otelSeverityForText = map[string]plog.SeverityNumber{
"trace": plog.SeverityNumberTrace,
"trace2": plog.SeverityNumberTrace2,
"trace3": plog.SeverityNumberTrace3,
"trace4": plog.SeverityNumberTrace4,
"debug": plog.SeverityNumberDebug,
"debug2": plog.SeverityNumberDebug2,
"debug3": plog.SeverityNumberDebug3,
"debug4": plog.SeverityNumberDebug4,
"info": plog.SeverityNumberInfo,
"info2": plog.SeverityNumberInfo2,
"info3": plog.SeverityNumberInfo3,
"info4": plog.SeverityNumberInfo4,
"warn": plog.SeverityNumberWarn,
"warn2": plog.SeverityNumberWarn2,
"warn3": plog.SeverityNumberWarn3,
"warn4": plog.SeverityNumberWarn4,
"error": plog.SeverityNumberError,
"error2": plog.SeverityNumberError2,
"error3": plog.SeverityNumberError3,
"error4": plog.SeverityNumberError4,
"fatal": plog.SeverityNumberFatal,
"fatal2": plog.SeverityNumberFatal2,
"fatal3": plog.SeverityNumberFatal3,
"fatal4": plog.SeverityNumberFatal4,
// Google Cloud Logging LogSeverity values not mapped in the previous generic aliases.
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
"default": plog.SeverityNumberUnspecified,
"notice": plog.SeverityNumberInfo3,
"warning": plog.SeverityNumberWarn,
"critical": plog.SeverityNumberFatal,
"alert": plog.SeverityNumberFatal3,
"emergency": plog.SeverityNumberFatal4,
}
type attributeProcessingError struct {
Err error
Key string
}
func (e *attributeProcessingError) Error() string {
return fmt.Sprintf("could not process attribute %s: %s", e.Key, e.Err.Error())
}
type unsupportedValueTypeError struct {
ValueType pcommon.ValueType
}
func (e *unsupportedValueTypeError) Error() string {
return fmt.Sprintf("unsupported value type %v", e.ValueType)
}
type LogsExporter struct {
obs selfObservability
loggingClient *loggingv2.Client
cfg Config
mapper logMapper
timeout time.Duration
}
type logMapper struct {
obs selfObservability
cfg Config
maxEntrySize int
maxRequestSize int
}
func NewGoogleCloudLogsExporter(
ctx context.Context,
cfg Config,
set exporter.Settings,
timeout time.Duration,
) (*LogsExporter, error) {
SetUserAgent(&cfg, set.BuildInfo)
obs := selfObservability{
log: set.TelemetrySettings.Logger,
meterProvider: set.TelemetrySettings.MeterProvider,
}
return &LogsExporter{
cfg: cfg,
obs: obs,
timeout: timeout,
mapper: logMapper{
obs: obs,
cfg: cfg,
maxEntrySize: defaultMaxEntrySize,
maxRequestSize: defaultMaxRequestSize,
},
}, nil
}
// ConfigureExporter is used by integration tests to set exporter settings not visible to users.
func (l *LogsExporter) ConfigureExporter(config *logsutil.ExporterConfig) {
if config == nil {
return
}
if config.MaxEntrySize > 0 {
l.mapper.maxEntrySize = config.MaxEntrySize
}
if config.MaxRequestSize > 0 {
l.mapper.maxRequestSize = config.MaxRequestSize
}
}
func (l *LogsExporter) Start(ctx context.Context, _ component.Host) error {
clientOpts, err := generateClientOptions(ctx, &l.cfg.LogConfig.ClientConfig, &l.cfg, loggingv2.DefaultAuthScopes(), l.obs.meterProvider)
if err != nil {
return err
}
loggingClient, err := loggingv2.NewClient(ctx, clientOpts...)
if err != nil {
return err
}
if l.cfg.LogConfig.ClientConfig.Compression == gzip.Name {
loggingClient.CallOptions.WriteLogEntries = append(loggingClient.CallOptions.WriteLogEntries,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}
l.loggingClient = loggingClient
// We might have modified the config when we generated options above.
// Make sure changes to the config are synced to the mapper.
l.mapper.cfg = l.cfg
return nil
}
func (l *LogsExporter) Shutdown(ctx context.Context) error {
if l.loggingClient != nil {
return l.loggingClient.Close()
}
return nil
}
func (l *LogsExporter) PushLogs(ctx context.Context, ld plog.Logs) error {
if l.loggingClient == nil {
return errors.New("not started")
}
projectEntries, err := l.mapper.createEntries(ld)
if err != nil {
return err
}
var errs []error
for project, entries := range projectEntries {
entry := 0
currentBatchSize := 0
// Send entries in WriteRequest chunks
for len(entries) > 0 {
// default to max int so that when we are at index=len we skip the size check to avoid panic
// (index=len is the break condition when we reassign entries=entries[len:])
entrySize := l.mapper.maxRequestSize
if entry < len(entries) {
entrySize = proto.Size(entries[entry])
}
// this block gets skipped if we are out of entries to check
if currentBatchSize+entrySize < l.mapper.maxRequestSize {
// if adding the current entry to the current batch doesn't go over the request size,
// increase the index and account for the new request size, then continue
currentBatchSize += entrySize
entry++
continue
}
// override destination project quota for this write request, if applicable
if l.cfg.DestinationProjectQuota {
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"x-goog-user-project": strings.TrimPrefix(project, "projects/")}))
}
// if the current entry goes over the request size (or we have gone over every entry, i.e. index=len),
// write the list up to but not including the current entry's index
_, err := l.writeLogEntries(ctx, entries[:entry])
errs = append(errs, err)
entries = entries[entry:]
entry = 0
currentBatchSize = 0
}
}
return errors.Join(errs...)
}
func (l logMapper) createEntries(ld plog.Logs) (map[string][]*logpb.LogEntry, error) {
// if destination_project_quota is enabled, projectMapKey will be the name of the project for each batch of entries
// otherwise, we can mix project entries for more efficient batching and store all entries in a single list
projectMapKey := ""
var errs []error
entries := make(map[string][]*logpb.LogEntry)
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rl := ld.ResourceLogs().At(i)
mr := l.cfg.LogConfig.MapMonitoredResource(rl.Resource())
extraResourceLabels := attributesToUnsanitizedLabels(filterAttributes(rl.Resource().Attributes(), l.cfg.LogConfig.ServiceResourceLabels, l.cfg.LogConfig.ResourceFilters))
projectID := l.cfg.ProjectID
// override project ID with gcp.project.id, if present
if projectFromResource, found := rl.Resource().Attributes().Get(resourcemapping.ProjectIDAttributeKey); found {
projectID = projectFromResource.AsString()
}
for j := 0; j < rl.ScopeLogs().Len(); j++ {
sl := rl.ScopeLogs().At(j)
logLabels := mergeLogLabels(sl.Scope().Name(), sl.Scope().Version(), extraResourceLabels)
for k := 0; k < sl.LogRecords().Len(); k++ {
// make a copy of logLabels so that shared attributes (scope/resource) are copied across LogRecords,
// but that individual LogRecord attributes don't copy over to other Records via map reference.
entryLabels := make(map[string]string)
for k, v := range logLabels {
entryLabels[k] = v
}
log := sl.LogRecords().At(k)
// We can't just set logName on these entries otherwise the conversion to internal will fail
// We also need the logName here to be able to accurately calculate the overhead of entry
// metadata in case the payload needs to be split between multiple entries.
logName, err := l.getLogName(log)
if err != nil {
errs = append(errs, err)
continue
}
splitEntries, err := l.logToSplitEntries(
log,
mr,
entryLabels,
time.Now(),
logName,
projectID,
)
if err != nil {
errs = append(errs, err)
continue
}
for _, entry := range splitEntries {
if l.cfg.DestinationProjectQuota {
projectMapKey = projectID
}
if _, ok := entries[projectMapKey]; !ok {
entries[projectMapKey] = make([]*logpb.LogEntry, 0)
}
entries[projectMapKey] = append(entries[projectMapKey], entry)
}
}
}
}
return entries, errors.Join(errs...)
}
// converts attributes to a map[string]string.
// It ensures the label values are valid UTF-8, but does not sanitize keys.
func attributesToUnsanitizedLabels(attrs pcommon.Map) labels {
ls := make(labels, attrs.Len())
attrs.Range(func(k string, v pcommon.Value) bool {
ls[k] = sanitizeUTF8(v.AsString())
return true
})
return ls
}
func mergeLogLabels(instrumentationSource, instrumentationVersion string, resourceLabels map[string]string) map[string]string {
labelsMap := make(map[string]string)
// TODO(damemi): Make overwriting these labels (if they already exist) configurable
if len(instrumentationSource) > 0 {
labelsMap["instrumentation_source"] = instrumentationSource
}
if len(instrumentationVersion) > 0 {
labelsMap["instrumentation_version"] = instrumentationVersion
}
return mergeLabels(labelsMap, resourceLabels)
}
func (l *LogsExporter) writeLogEntries(ctx context.Context, batch []*logpb.LogEntry) (*logpb.WriteLogEntriesResponse, error) {
timeout := l.timeout
if timeout <= 0 {
timeout = 12 * time.Second
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
request := &logpb.WriteLogEntriesRequest{
PartialSuccess: true,
Entries: batch,
}
// TODO(damemi): handle response code
return l.loggingClient.WriteLogEntries(ctx, request)
}
func (l logMapper) getLogName(log plog.LogRecord) (string, error) {
logNameAttr, exists := log.Attributes().Get(LogNameAttributeKey)
if exists {
return logNameAttr.AsString(), nil
}
if len(l.cfg.LogConfig.DefaultLogName) > 0 {
return l.cfg.LogConfig.DefaultLogName, nil
}
return "", fmt.Errorf("no log name provided. Set the 'default_log_name' option, or add the 'gcp.log_name' attribute to set a log name")
}
func (l logMapper) logToSplitEntries(
logRecord plog.LogRecord,
mr *monitoredrespb.MonitoredResource,
logLabels map[string]string,
processTime time.Time,
logName string,
projectID string,
) ([]*logpb.LogEntry, error) {
ts := logRecord.Timestamp().AsTime()
if logRecord.Timestamp() == 0 || ts.IsZero() {
// if timestamp is unset, fall back to observed_time_unix_nano as recommended
// (see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179)
if logRecord.ObservedTimestamp() != 0 {
ts = logRecord.ObservedTimestamp().AsTime()
} else {
// if observed_time is 0, use the current time
ts = processTime
}
}
entry := &logpb.LogEntry{
Resource: mr,
Timestamp: timestamppb.New(ts),
Labels: logLabels,
LogName: fmt.Sprintf("projects/%s/logs/%s", projectID, url.PathEscape(logName)),
}
// build our own map off OTel attributes so we don't have to call .Get() for each special case
// (.Get() ranges over all attributes each time)
attrsMap := make(map[string]pcommon.Value)
logRecord.Attributes().Range(func(k string, v pcommon.Value) bool {
attrsMap[k] = v
return true
})
// parse LogEntrySourceLocation struct from OTel attribute
if sourceLocation, ok := attrsMap[SourceLocationAttributeKey]; ok {
var logEntrySourceLocation logpb.LogEntrySourceLocation
err := unmarshalAttribute(sourceLocation, &logEntrySourceLocation)
if err != nil {
return nil, &attributeProcessingError{Key: SourceLocationAttributeKey, Err: err}
}
entry.SourceLocation = &logEntrySourceLocation
delete(attrsMap, SourceLocationAttributeKey)
}
// parse TraceSampled boolean from OTel attribute or IsSampled OTLP flag
if traceSampled, ok := attrsMap[TraceSampledAttributeKey]; ok || logRecord.Flags().IsSampled() {
entry.TraceSampled = (traceSampled.Bool() || logRecord.Flags().IsSampled())
delete(attrsMap, TraceSampledAttributeKey)
}
// parse TraceID and SpanID, if present
if traceID := logRecord.TraceID(); !traceID.IsEmpty() {
entry.Trace = fmt.Sprintf("projects/%s/traces/%s", projectID, hex.EncodeToString(traceID[:]))
}
if spanID := logRecord.SpanID(); !spanID.IsEmpty() {
entry.SpanId = hex.EncodeToString(spanID[:])
}
if httpRequestAttr, ok := attrsMap[HTTPRequestAttributeKey]; ok {
httpRequest, err := l.parseHTTPRequest(httpRequestAttr)
if err != nil {
l.obs.log.Debug("Unable to parse httpRequest", zap.Error(err))
}
entry.HttpRequest = httpRequest
delete(attrsMap, HTTPRequestAttributeKey)
}
if logRecord.SeverityNumber() < 0 || int(logRecord.SeverityNumber()) > len(severityMapping)-1 {
return nil, fmt.Errorf("unknown SeverityNumber %v", logRecord.SeverityNumber())
}
severityNumber := logRecord.SeverityNumber()
// Log severity levels are based on numerical values defined by Otel/GCP, which are informally mapped to generic text values such as "ALERT", "Debug", etc.
// In some cases, a SeverityText value can be automatically mapped to a matching SeverityNumber.
// If not (for example, when directly setting the SeverityText on a Log entry with the Transform processor), then the
// SeverityText might be something like "ALERT" while the SeverityNumber is still "0".
// In this case, we will attempt to map the text ourselves to one of the defined Otel SeverityNumbers.
// We do this by checking that the SeverityText is NOT "default" (ie, it exists in our map) and that the SeverityNumber IS "0".
// (This also excludes other unknown/custom severity text values, which may have user-defined mappings in the collector)
if severityForText, ok := otelSeverityForText[strings.ToLower(logRecord.SeverityText())]; ok && severityNumber == 0 {
severityNumber = severityForText
}
entry.Severity = severityMapping[severityNumber]
// Parse severityNumber > 17 (error) to a GCP Error Reporting entry if enabled
if severityNumber >= 17 && l.cfg.LogConfig.ErrorReportingType {
if logRecord.Body().Type() != pcommon.ValueTypeMap {
strValue := logRecord.Body().AsString()
logRecord.Body().SetEmptyMap()
logRecord.Body().Map().PutStr("message", strValue)
}
logRecord.Body().Map().PutStr(GCPTypeKey, GCPErrorReportingTypeValue)
}
// parse remaining OTel attributes to GCP labels
for k, v := range attrsMap {
// skip "gcp.*" attributes since we process these to other fields
if strings.HasPrefix(k, "gcp.") {
continue
}
if _, ok := entry.Labels[k]; !ok {
entry.Labels[k] = v.AsString()
}
}
// Handle map and bytes as JSON-structured logs if they are successfully converted.
switch logRecord.Body().Type() {
case pcommon.ValueTypeMap:
s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw())
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
return []*logpb.LogEntry{entry}, nil
}
l.obs.log.Warn(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err))
case pcommon.ValueTypeBytes:
s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
return []*logpb.LogEntry{entry}, nil
}
l.obs.log.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err))
}
// For all other ValueTypes, export as a string payload.
// log.Body().AsString() can be expensive, and we use it several times below this, so
// do it once and save that as a variable.
logBodyString := logRecord.Body().AsString()
if len(logBodyString) == 0 {
return []*logpb.LogEntry{entry}, nil
}
// Calculate the size of the internal log entry so this overhead can be accounted
// for when determining the need to split based on payload size
// TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well
overheadBytes := proto.Size(entry)
// Split log entries with a string payload into fewer entries
splits := int(math.Ceil(float64(len([]byte(logBodyString))) / float64(l.maxEntrySize-overheadBytes)))
if splits <= 1 {
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: logBodyString}
return []*logpb.LogEntry{entry}, nil
}
entries := make([]*logpb.LogEntry, splits)
// Start by assuming all splits will be even (this may not be the case)
startIndex := 0
endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(logBodyString))))
for i := 0; i < splits; i++ {
newEntry := proto.Clone(entry).(*logpb.LogEntry)
currentSplit := logBodyString[startIndex:endIndex]
// If the current split is larger than the entry size, iterate until it is within the max
// (This may happen since not all characters are exactly 1 byte)
for len([]byte(currentSplit)) > l.maxEntrySize {
endIndex--
currentSplit = logBodyString[startIndex:endIndex]
}
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit}
newEntry.Split = &logpb.LogSplit{
Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()),
Index: int32(i),
TotalSplits: int32(splits),
}
entries[i] = newEntry
// Update slice indices to the next chunk
startIndex = endIndex
endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(logBodyString))))
}
return entries, nil
}
// JSON keys derived from:
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#httprequest
type httpRequestLog struct {
RemoteIP string `json:"remoteIp"`
RequestURL string `json:"requestUrl"`
Latency string `json:"latency"`
Referer string `json:"referer"`
ServerIP string `json:"serverIp"`
UserAgent string `json:"userAgent"`
RequestMethod string `json:"requestMethod"`
Protocol string `json:"protocol"`
ResponseSize int64 `json:"responseSize,string"`
RequestSize int64 `json:"requestSize,string"`
CacheFillBytes int64 `json:"cacheFillBytes,string"`
Status int32 `json:"status,string"`
CacheLookup bool `json:"cacheLookup"`
CacheHit bool `json:"cacheHit"`
CacheValidatedWithOriginServer bool `json:"cacheValidatedWithOriginServer"`
}
func (l logMapper) parseHTTPRequest(httpRequestAttr pcommon.Value) (*logtypepb.HttpRequest, error) {
var parsedHTTPRequest httpRequestLog
err := unmarshalAttribute(httpRequestAttr, &parsedHTTPRequest)
if err != nil {
return nil, &attributeProcessingError{Key: HTTPRequestAttributeKey, Err: err}
}
pb := &logtypepb.HttpRequest{
RequestMethod: parsedHTTPRequest.RequestMethod,
RequestUrl: fixUTF8(parsedHTTPRequest.RequestURL),
RequestSize: parsedHTTPRequest.RequestSize,
Status: parsedHTTPRequest.Status,
ResponseSize: parsedHTTPRequest.ResponseSize,
UserAgent: parsedHTTPRequest.UserAgent,
ServerIp: parsedHTTPRequest.ServerIP,
RemoteIp: parsedHTTPRequest.RemoteIP,
Referer: parsedHTTPRequest.Referer,
CacheHit: parsedHTTPRequest.CacheHit,
CacheValidatedWithOriginServer: parsedHTTPRequest.CacheValidatedWithOriginServer,
Protocol: "HTTP/1.1",
CacheFillBytes: parsedHTTPRequest.CacheFillBytes,
CacheLookup: parsedHTTPRequest.CacheLookup,
}
if parsedHTTPRequest.Latency != "" {
latency, err := time.ParseDuration(parsedHTTPRequest.Latency)
if err == nil && latency != 0 {
pb.Latency = durationpb.New(latency)
}
}
return pb, nil
}
// toProtoStruct converts v, which must marshal into a JSON object,
// into a Google Struct proto.
// Mostly copied from
// https://github.com/googleapis/google-cloud-go/blob/69705144832c715cf23832602ad9338b911dff9a/logging/logging.go#L577
func toProtoStruct(v any) (*structpb.Struct, error) {
// v is a Go value that supports JSON marshaling. We want a Struct
// protobuf. Some day we may have a more direct way to get there, but right
// now the only way is to marshal the Go value to JSON, unmarshal into a
// map, and then build the Struct proto from the map.
jb, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("logging: json.Marshal: %w", err)
}
var m map[string]any
err = json.Unmarshal(jb, &m)
if err != nil {
return nil, fmt.Errorf("logging: json.Unmarshal: %w", err)
}
return structpb.NewStruct(m)
}
// fixUTF8 is a helper that fixes an invalid UTF-8 string by replacing
// invalid UTF-8 runes with the Unicode replacement character (U+FFFD).
// See Issue https://github.com/googleapis/google-cloud-go/issues/1383.
// Coped from https://github.com/googleapis/google-cloud-go/blob/69705144832c715cf23832602ad9338b911dff9a/logging/logging.go#L557
func fixUTF8(s string) string {
if utf8.ValidString(s) {
return s
}
// Otherwise time to build the sequence.
buf := new(bytes.Buffer)
buf.Grow(len(s))
for _, r := range s {
if utf8.ValidRune(r) {
buf.WriteRune(r)
} else {
buf.WriteRune('\uFFFD')
}
}
return buf.String()
}
func unmarshalAttribute(v pcommon.Value, out any) error {
var valueBytes []byte
switch v.Type() {
case pcommon.ValueTypeBytes:
valueBytes = v.Bytes().AsRaw()
case pcommon.ValueTypeMap, pcommon.ValueTypeStr:
valueBytes = []byte(v.AsString())
default:
return &unsupportedValueTypeError{ValueType: v.Type()}
}
// TODO: Investigate doing this without the JSON unmarshal. Getting the attribute as a map
// instead of a slice of bytes could do, but would need a lot of type casting and checking
// assertions with it.
return json.Unmarshal(valueBytes, out)
}