internal/datadog/hostmetadata/metadata.go (151 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 // Package hostmetadata is responsible for collecting host metadata from different providers // such as EC2, ECS, AWS, etc and pushing it to Datadog. package hostmetadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/hostmetadata" import ( "bytes" "compress/gzip" "context" "encoding/json" "fmt" "net/http" "time" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" ec2Attributes "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/ec2" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/gcp" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/clientutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/hostmetadata/internal/ec2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/hostmetadata/internal/gohai" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/hostmetadata/internal/system" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog/scrub" ) // metadataFromAttributes gets metadata info from attributes following // OpenTelemetry semantic conventions func metadataFromAttributes(attrs pcommon.Map, hostFromAttributesHandler attributes.HostFromAttributesHandler) payload.HostMetadata { hm := payload.HostMetadata{Meta: &payload.Meta{}, Tags: &payload.HostTags{}} if src, ok := attributes.SourceFromAttrs(attrs, hostFromAttributesHandler); ok && src.Kind == source.HostnameKind { hm.InternalHostname = src.Identifier hm.Meta.Hostname = src.Identifier } // AWS EC2 resource metadata cloudProvider, ok := attrs.Get(conventions.AttributeCloudProvider) switch { case ok && cloudProvider.Str() == conventions.AttributeCloudProviderAWS: ec2HostInfo := ec2Attributes.HostInfoFromAttributes(attrs) hm.Meta.InstanceID = ec2HostInfo.InstanceID hm.Meta.EC2Hostname = ec2HostInfo.EC2Hostname hm.Tags.OTel = append(hm.Tags.OTel, ec2HostInfo.EC2Tags...) case ok && cloudProvider.Str() == conventions.AttributeCloudProviderGCP: gcpHostInfo := gcp.HostInfoFromAttrs(attrs) hm.Tags.GCP = gcpHostInfo.GCPTags hm.Meta.HostAliases = append(hm.Meta.HostAliases, gcpHostInfo.HostAliases...) } return hm } func fillHostMetadata(params exporter.Settings, pcfg PusherConfig, p source.Provider, hm *payload.HostMetadata) { // Could not get hostname from attributes if hm.InternalHostname == "" { if src, err := p.Source(context.TODO()); err == nil && src.Kind == source.HostnameKind { hm.InternalHostname = src.Identifier hm.Meta.Hostname = src.Identifier } } // This information always gets filled in here // since it does not come from OTEL conventions hm.Flavor = params.BuildInfo.Command hm.Version = params.BuildInfo.Version hm.Tags.OTel = append(hm.Tags.OTel, pcfg.ConfigTags...) hm.Payload = gohai.NewPayload(params.Logger) hm.Processes = gohai.NewProcessesPayload(hm.Meta.Hostname, params.Logger) // EC2 data was not set from attributes if hm.Meta.EC2Hostname == "" { ec2HostInfo := ec2.GetHostInfo(context.Background(), params.Logger) hm.Meta.EC2Hostname = ec2HostInfo.EC2Hostname hm.Meta.InstanceID = ec2HostInfo.InstanceID } // System data was not set from attributes if hm.Meta.SocketHostname == "" { systemHostInfo := system.GetHostInfo(params.Logger) hm.Meta.SocketHostname = systemHostInfo.OS hm.Meta.SocketFqdn = systemHostInfo.FQDN } } func (p *pusher) pushMetadata(hm payload.HostMetadata) error { path := p.pcfg.MetricsEndpoint + "/intake" marshaled, err := json.Marshal(hm) if err != nil { return fmt.Errorf("error marshaling metadata payload: %w", err) } var buf bytes.Buffer g := gzip.NewWriter(&buf) if _, err = g.Write(marshaled); err != nil { return fmt.Errorf("error compressing metadata payload: %w", err) } if err = g.Close(); err != nil { return fmt.Errorf("error closing gzip writer: %w", err) } req, err := http.NewRequest(http.MethodPost, path, &buf) if err != nil { return fmt.Errorf("error creating metadata request: %w", err) } clientutil.SetDDHeaders(req.Header, p.params.BuildInfo, p.pcfg.APIKey) // Set the content type to JSON and the content encoding to gzip clientutil.SetExtraHeaders(req.Header, clientutil.JSONHeaders) resp, err := p.httpClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 400 { return fmt.Errorf( "%q error when sending metadata payload to %s", resp.Status, path, ) } return nil } func (p *pusher) Push(_ context.Context, hm payload.HostMetadata) error { if hm.Meta.Hostname == "" { // if the hostname is empty, don't send metadata; we don't need it. p.params.Logger.Debug("Skipping host metadata since the hostname is empty") return nil } p.params.Logger.Debug("Sending host metadata payload", zap.Any("payload", hm)) _, err := p.retrier.DoWithRetries(context.Background(), func(context.Context) error { return p.pushMetadata(hm) }) return err } var _ inframetadata.Pusher = (*pusher)(nil) type pusher struct { params exporter.Settings pcfg PusherConfig retrier *clientutil.Retrier httpClient *http.Client } // NewPusher creates a new inframetadata.Pusher that pushes metadata payloads func NewPusher(params exporter.Settings, pcfg PusherConfig) inframetadata.Pusher { return &pusher{ params: params, pcfg: pcfg, retrier: clientutil.NewRetrier(params.Logger, pcfg.RetrySettings, scrub.NewScrubber()), httpClient: clientutil.NewHTTPClient(pcfg.ClientConfig), } } // RunPusher to push host metadata payloads from the host where the Collector is running periodically to Datadog intake. // This function is blocking and it is meant to be run on a goroutine. func RunPusher(ctx context.Context, params exporter.Settings, pcfg PusherConfig, p source.Provider, attrs pcommon.Map, reporter *inframetadata.Reporter) { // Push metadata every 30 minutes ticker := time.NewTicker(30 * time.Minute) defer ticker.Stop() defer params.Logger.Debug("Shut down host metadata routine") // Get host metadata from resources and fill missing info using our exporter. // Currently we only retrieve it once but still send the same payload // every 30 minutes for consistency with the Datadog Agent behavior. // // All fields that are being filled in by our exporter // do not change over time. If this ever changes `hostMetadata` // *must* be deep copied before calling `fillHostMetadata`. hostMetadata := payload.NewEmpty() if pcfg.UseResourceMetadata { hostMetadata = metadataFromAttributes(attrs, nil) } fillHostMetadata(params, pcfg, p, &hostMetadata) // Consume one first time if err := reporter.ConsumeHostMetadata(hostMetadata); err != nil { params.Logger.Warn("Failed to consume host metadata", zap.Any("payload", hostMetadata)) } for { select { case <-ctx.Done(): return case <-ticker.C: if err := reporter.ConsumeHostMetadata(hostMetadata); err != nil { params.Logger.Warn("Failed to consume host metadata", zap.Any("payload", hostMetadata)) } } } }