extension/sumologicextension/extension.go (879 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sumologicextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension"
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"regexp"
"runtime"
"strings"
"sync"
"time"
"github.com/Showmax/go-fqdn"
"github.com/cenkalti/backoff/v4"
"github.com/shirou/gopsutil/v4/host"
"github.com/shirou/gopsutil/v4/process"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensionauth"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension/api"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension/credentials"
)
type SumologicExtension struct {
collectorName string
buildVersion string
// The lock around baseURL is needed because sumologicexporter is using
// it as base URL for API requests and this access has to be coordinated.
baseURLLock sync.RWMutex
baseURL string
credsNotifyLock sync.Mutex
credsNotifyUpdate chan struct{}
host component.Host
conf *Config
origLogger *zap.Logger
logger *zap.Logger
credentialsStore credentials.Store
hashKey string
httpClient *http.Client
registrationInfo api.OpenRegisterResponsePayload
updateMetadata bool
stickySessionCookieLock sync.RWMutex
stickySessionCookie string
closeChan chan struct{}
closeOnce sync.Once
backOff *backoff.ExponentialBackOff
id component.ID
}
const (
heartbeatURL = "/api/v1/collector/heartbeat"
metadataURL = "/api/v1/otCollectors/metadata"
registerURL = "/api/v1/collector/register"
collectorIDField = "collector_id"
collectorNameField = "collector_name"
collectorCredentialIDField = "collector_credential_id"
stickySessionKey = "AWSALB"
activeMQJavaProcess = "activemq.jar"
cassandraJavaProcess = "org.apache.cassandra.service.CassandraDaemon"
dockerDesktopJavaProcess = "com.docker.backend"
jmxJavaProcess = "com.sun.management.jmxremote"
)
const (
updateCollectorMetadataID = "extension.sumologic.updateCollectorMetadata"
updateCollectorMetadataStage = featuregate.StageAlpha
DefaultHeartbeatInterval = 15 * time.Second
)
var updateCollectorMetadataFeatureGate *featuregate.Gate
func init() {
updateCollectorMetadataFeatureGate = featuregate.GlobalRegistry().MustRegister(
updateCollectorMetadataID,
updateCollectorMetadataStage,
featuregate.WithRegisterDescription("When enabled, the collector will update its Sumo Logic metadata on startup."),
featuregate.WithRegisterReferenceURL("https://github.com/SumoLogic/sumologic-otel-collector/pull/858"),
)
}
// SumologicExtension implements extensionauth.HTTPClient
var (
_ extension.Extension = (*SumologicExtension)(nil)
_ extensionauth.HTTPClient = (*SumologicExtension)(nil)
)
func newSumologicExtension(conf *Config, logger *zap.Logger, id component.ID, buildVersion string) (*SumologicExtension, error) {
if conf.Credentials.InstallationToken == "" {
return nil, errors.New("access credentials not provided: need installation_token")
}
hostname, err := getHostname(logger)
if err != nil {
return nil, err
}
credentialsStore, err := credentials.NewLocalFsStore(
credentials.WithCredentialsDirectory(conf.CollectorCredentialsDirectory),
credentials.WithLogger(logger),
)
if err != nil {
return nil, fmt.Errorf("failed to initialize credentials store: %w", err)
}
var (
collectorName string
hashKey = createHashKey(conf)
)
if conf.CollectorName == "" {
// If collector name is not set by the user, check if the collector was restarted
// and that we can reuse collector name save in credentials store.
if creds, err := credentialsStore.Get(hashKey); err != nil {
// If credentials file is not stored on filesystem generate collector name
collectorName = hostname
} else {
collectorName = creds.CollectorName
}
} else {
collectorName = conf.CollectorName
}
if conf.HeartBeatInterval <= 0 {
conf.HeartBeatInterval = DefaultHeartbeatInterval
}
// Prepare ExponentialBackoff
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = conf.BackOff.InitialInterval
backOff.MaxElapsedTime = conf.BackOff.MaxElapsedTime
backOff.MaxInterval = conf.BackOff.MaxInterval
return &SumologicExtension{
collectorName: collectorName,
buildVersion: buildVersion,
baseURL: strings.TrimSuffix(conf.APIBaseURL, "/"),
credsNotifyUpdate: make(chan struct{}),
conf: conf,
origLogger: logger,
logger: logger,
hashKey: hashKey,
credentialsStore: credentialsStore,
updateMetadata: updateCollectorMetadataFeatureGate.IsEnabled(),
closeChan: make(chan struct{}),
backOff: backOff,
id: id,
}, nil
}
func createHashKey(conf *Config) string {
return fmt.Sprintf("%s%s%s",
conf.CollectorName,
conf.Credentials.InstallationToken,
strings.TrimSuffix(conf.APIBaseURL, "/"),
)
}
func (se *SumologicExtension) Start(ctx context.Context, host component.Host) error {
var err error
se.host = host
// if force registration is not enabled, verify that the store is correctly configured
if !se.conf.ForceRegistration {
err = se.credentialsStore.Validate()
if err != nil {
return err
}
}
colCreds, err := se.getCredentials(ctx)
if err != nil {
return err
}
if err = se.injectCredentials(ctx, colCreds); err != nil {
return err
}
// Add logger fields based on actual collector name and ID.
se.logger = se.origLogger.With(
zap.String(collectorNameField, colCreds.Credentials.CollectorName),
zap.String(collectorIDField, colCreds.Credentials.CollectorID),
)
if se.updateMetadata {
err = se.updateMetadataWithBackoff(ctx)
if err != nil {
return err
}
}
go se.heartbeatLoop()
return nil
}
// Shutdown is invoked during service shutdown.
func (se *SumologicExtension) Shutdown(ctx context.Context) error {
se.closeOnce.Do(func() { close(se.closeChan) })
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
func (se *SumologicExtension) validateCredentials(
ctx context.Context,
colCreds credentials.CollectorCredentials,
) error {
se.logger.Info("Validating collector credentials...",
zap.String(collectorCredentialIDField, colCreds.Credentials.CollectorCredentialID),
zap.String(collectorIDField, colCreds.Credentials.CollectorID),
)
if err := se.injectCredentials(ctx, colCreds); err != nil {
return err
}
se.backOff.Reset()
var err error
for {
err = se.sendHeartbeatWithHTTPClient(ctx, se.httpClient)
if errors.Is(err, errUnauthorizedHeartbeat) || err == nil {
return err
}
nbo := se.backOff.NextBackOff()
var backOffErr *backoff.PermanentError
// Return error if backoff reaches the limit or uncoverable error is spotted
if ok := errors.As(err, &backOffErr); nbo == se.backOff.Stop || ok {
return err
}
se.logger.Info(fmt.Sprintf("Retrying credentials validation due to error %s", err))
t := time.NewTimer(nbo)
defer t.Stop()
select {
case <-t.C:
case <-ctx.Done():
return fmt.Errorf("credential validation cancelled: %w", ctx.Err())
}
}
}
// injectCredentials injects the collector credentials:
// - into registration info that's stored in the extension and can be used by roundTripper
// - into http client and its transport so that each request is using collector
// credentials as authentication keys
func (se *SumologicExtension) injectCredentials(ctx context.Context, colCreds credentials.CollectorCredentials) error {
se.credsNotifyLock.Lock()
defer se.credsNotifyLock.Unlock()
// Set the registration info so that it can be used in RoundTripper.
se.registrationInfo = colCreds.Credentials
httpClient, err := se.getHTTPClient(ctx, se.conf.ClientConfig, colCreds.Credentials)
if err != nil {
return err
}
se.httpClient = httpClient
// Let components know that the credentials may have changed.
close(se.credsNotifyUpdate)
se.credsNotifyUpdate = make(chan struct{})
return nil
}
func (se *SumologicExtension) getHTTPClient(
ctx context.Context,
httpClientSettings confighttp.ClientConfig,
_ api.OpenRegisterResponsePayload,
) (*http.Client, error) {
httpClient, err := httpClientSettings.ToClient(
ctx,
se.host,
componenttest.NewNopTelemetrySettings(),
)
if err != nil {
return nil, fmt.Errorf("couldn't create HTTP client: %w", err)
}
// Set the transport so that all requests from httpClient will contain
// the collector credentials.
httpClient.Transport, err = se.RoundTripper(httpClient.Transport)
if err != nil {
return nil, fmt.Errorf("couldn't create HTTP client transport: %w", err)
}
return httpClient, nil
}
// getCredentials retrieves the credentials for the collector.
// It does so by checking the local credentials store and by validating those credentials.
// In case they are invalid or are not available through local credentials store
// then it tries to register the collector using the provided access keys.
func (se *SumologicExtension) getCredentials(ctx context.Context) (credentials.CollectorCredentials, error) {
var (
colCreds credentials.CollectorCredentials
err error
)
if !se.conf.ForceRegistration {
colCreds, err = se.getLocalCredentials(ctx)
if err == nil {
errV := se.validateCredentials(ctx, colCreds)
if errV == nil {
se.logger.Info("Found stored credentials, skipping registration",
zap.String(collectorNameField, colCreds.Credentials.CollectorName),
)
return colCreds, nil
}
// We are unable to confirm if credentials are valid or not as we do not have (clear) response from the API
if !errors.Is(errV, errUnauthorizedHeartbeat) {
return credentials.CollectorCredentials{}, errV
}
// Credentials might have ended up being invalid or the collector
// might have been removed in Sumo.
// Fall back to removing the credentials and recreating them by registering
// the collector.
if err = se.credentialsStore.Delete(se.hashKey); err != nil {
se.logger.Error(
"Unable to delete old collector credentials", zap.Error(err),
)
}
se.logger.Info("Locally stored credentials invalid. Trying to re-register...",
zap.String(collectorNameField, colCreds.Credentials.CollectorName),
zap.String(collectorIDField, colCreds.Credentials.CollectorID),
zap.Error(errV),
)
} else {
se.logger.Info("Locally stored credentials not found, registering the collector")
}
}
colCreds, err = se.getCredentialsByRegistering(ctx)
if err != nil {
return credentials.CollectorCredentials{}, err
}
return colCreds, nil
}
// getCredentialsByRegistering registers the collector and returns the credentials
// obtained from the API.
func (se *SumologicExtension) getCredentialsByRegistering(ctx context.Context) (credentials.CollectorCredentials, error) {
colCreds, err := se.registerCollectorWithBackoff(ctx, se.collectorName)
if err != nil {
return credentials.CollectorCredentials{}, err
}
if err := se.credentialsStore.Store(se.hashKey, colCreds); err != nil {
se.logger.Error(
"Unable to store collector credentials, they will be used now but won't be re-used on next run",
zap.Error(err),
)
}
se.collectorName = colCreds.CollectorName
return colCreds, nil
}
// getLocalCredentials returns the credentials retrieved from local credentials
// storage in case they are available there.
func (se *SumologicExtension) getLocalCredentials(_ context.Context) (credentials.CollectorCredentials, error) {
colCreds, err := se.credentialsStore.Get(se.hashKey)
if err != nil {
return credentials.CollectorCredentials{},
fmt.Errorf("problem finding local collector credentials (hash key: %s): %w",
se.hashKey, err,
)
}
se.collectorName = colCreds.CollectorName
if colCreds.APIBaseURL != "" {
se.SetBaseURL(colCreds.APIBaseURL)
}
return colCreds, nil
}
// registerCollector registers the collector using registration API and returns
// the obtained collector credentials.
func (se *SumologicExtension) registerCollector(ctx context.Context, collectorName string) (credentials.CollectorCredentials, error) {
u, err := url.Parse(se.BaseURL())
if err != nil {
return credentials.CollectorCredentials{}, err
}
u.Path = registerURL
hostname, err := getHostname(se.logger)
if err != nil {
return credentials.CollectorCredentials{}, fmt.Errorf("cannot get hostname: %w", err)
}
var buff bytes.Buffer
if err = json.NewEncoder(&buff).Encode(api.OpenRegisterRequestPayload{
CollectorName: collectorName,
Description: se.conf.CollectorDescription,
Category: se.conf.CollectorCategory,
Fields: se.conf.CollectorFields,
Hostname: hostname,
Ephemeral: se.conf.Ephemeral,
Clobber: se.conf.Clobber,
TimeZone: se.conf.TimeZone,
}); err != nil {
return credentials.CollectorCredentials{}, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), &buff)
if err != nil {
return credentials.CollectorCredentials{}, err
}
addClientCredentials(req,
se.conf.Credentials,
)
addJSONHeaders(req)
se.logger.Info("Calling register API", zap.String("URL", u.String()))
client := *http.DefaultClient
client.CheckRedirect = func(_ *http.Request, _ []*http.Request) error {
return http.ErrUseLastResponse
}
res, err := client.Do(req)
if err != nil {
se.logger.Warn("Collector registration HTTP request failed", zap.Error(err))
return credentials.CollectorCredentials{}, fmt.Errorf("failed to register the collector: %w", err)
}
defer res.Body.Close()
if res.StatusCode < 200 || res.StatusCode >= 400 {
return credentials.CollectorCredentials{}, se.handleRegistrationError(res)
} else if res.StatusCode == http.StatusMovedPermanently {
// Use the URL from Location header for subsequent requests.
u := strings.TrimSuffix(res.Header.Get("Location"), "/")
se.SetBaseURL(u)
se.logger.Info("Redirected to a different deployment",
zap.String("url", u),
)
return se.registerCollector(ctx, collectorName)
}
var resp api.OpenRegisterResponsePayload
if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
return credentials.CollectorCredentials{}, err
}
if collectorName != resp.CollectorName {
se.logger.Warn("Collector name already in use, registered modified name", zap.String("registered_name", resp.CollectorName))
}
return credentials.CollectorCredentials{
CollectorName: collectorName,
Credentials: resp,
APIBaseURL: se.BaseURL(),
}, nil
}
// handleRegistrationError handles the collector registration errors and returns
// appropriate error for backoff handling and logging purposes.
func (se *SumologicExtension) handleRegistrationError(res *http.Response) error {
var errResponse api.ErrorResponsePayload
if err := json.NewDecoder(res.Body).Decode(&errResponse); err != nil {
var buff bytes.Buffer
if _, errCopy := io.Copy(&buff, res.Body); errCopy != nil {
return fmt.Errorf(
"failed to read the collector registration response body, status code: %d, err: %w",
res.StatusCode, errCopy,
)
}
return fmt.Errorf(
"failed to decode collector registration response body: %s, status code: %d, err: %w",
buff.String(), res.StatusCode, err,
)
}
se.logger.Warn("Collector registration failed",
zap.Int("status_code", res.StatusCode),
zap.String("error_id", errResponse.ID),
zap.Any("errors", errResponse.Errors),
)
// Return unrecoverable error for 4xx status codes except 429
if res.StatusCode >= 400 && res.StatusCode < 500 && res.StatusCode != http.StatusTooManyRequests {
return backoff.Permanent(fmt.Errorf(
"failed to register the collector, got HTTP status code: %d",
res.StatusCode,
))
}
return fmt.Errorf(
"failed to register the collector, got HTTP status code: %d", res.StatusCode,
)
}
// callRegisterWithBackoff calls registration using exponential backoff algorithm
// this loosely base on backoff.Retry function
func (se *SumologicExtension) registerCollectorWithBackoff(ctx context.Context, collectorName string) (credentials.CollectorCredentials, error) {
se.backOff.Reset()
for {
creds, err := se.registerCollector(ctx, collectorName)
if err == nil {
se.logger = se.origLogger.With(
zap.String(collectorNameField, creds.Credentials.CollectorName),
zap.String(collectorIDField, creds.Credentials.CollectorID),
)
se.logger.Info("Collector registration finished successfully")
return creds, nil
}
nbo := se.backOff.NextBackOff()
var backOffErr *backoff.PermanentError
// Return error if backoff reaches the limit or uncoverable error is spotted
if ok := errors.As(err, &backOffErr); nbo == se.backOff.Stop || ok {
return credentials.CollectorCredentials{}, fmt.Errorf("collector registration failed: %w", err)
}
t := time.NewTimer(nbo)
defer t.Stop()
select {
case <-t.C:
case <-ctx.Done():
return credentials.CollectorCredentials{}, fmt.Errorf("collector registration cancelled: %w", ctx.Err())
}
}
}
func (se *SumologicExtension) heartbeatLoop() {
if se.registrationInfo.CollectorCredentialID == "" || se.registrationInfo.CollectorCredentialKey == "" {
se.logger.Error("Collector not registered, cannot send heartbeat")
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
// When the close channel is closed ...
<-se.closeChan
// ... cancel the ongoing heartbeat request.
cancel()
}()
se.logger.Info("Heartbeat loop initialized. Starting to send heartbeat requests")
timer := time.NewTimer(se.conf.HeartBeatInterval)
for {
select {
case <-se.closeChan:
se.logger.Info("Heartbeat sender turned off")
return
default:
err := se.sendHeartbeatWithHTTPClient(ctx, se.httpClient)
if err != nil {
if errors.Is(err, errUnauthorizedHeartbeat) {
se.logger.Warn("Heartbeat request unauthorized, re-registering the collector")
var colCreds credentials.CollectorCredentials
colCreds, err = se.getCredentialsByRegistering(ctx)
if err != nil {
se.logger.Error("Heartbeat error, cannot register the collector", zap.Error(err))
continue
}
// Inject newly received credentials into extension's configuration.
if err = se.injectCredentials(ctx, colCreds); err != nil {
se.logger.Error("Heartbeat error, cannot inject new collector credentials", zap.Error(err))
continue
}
// Overwrite old logger fields with new collector name and ID.
se.logger = se.origLogger.With(
zap.String(collectorNameField, colCreds.Credentials.CollectorName),
zap.String(collectorIDField, colCreds.Credentials.CollectorID),
)
} else {
se.logger.Error("Heartbeat error", zap.Error(err))
}
} else {
se.logger.Debug("Heartbeat sent")
}
select {
case <-timer.C:
timer.Stop()
timer.Reset(se.conf.HeartBeatInterval)
case <-se.closeChan:
}
}
}
}
var (
errUnauthorizedHeartbeat = errors.New("heartbeat unauthorized")
errUnauthorizedMetadata = errors.New("metadata update unauthorized")
)
type ErrorAPI struct {
status int
body string
}
func (e ErrorAPI) Error() string {
return fmt.Sprintf("API error (status code: %d): %s", e.status, e.body)
}
func (se *SumologicExtension) sendHeartbeatWithHTTPClient(ctx context.Context, httpClient *http.Client) error {
u, err := url.Parse(se.BaseURL() + heartbeatURL)
if err != nil {
return fmt.Errorf("unable to parse heartbeat URL %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
if err != nil {
return fmt.Errorf("unable to create HTTP request %w", err)
}
addJSONHeaders(req)
res, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("unable to send HTTP request: %w", err)
}
defer res.Body.Close()
switch res.StatusCode {
default:
var buff bytes.Buffer
if _, err := io.Copy(&buff, res.Body); err != nil {
return fmt.Errorf(
"failed to copy collector heartbeat response body, status code: %d, err: %w",
res.StatusCode, err,
)
}
return fmt.Errorf("collector heartbeat request failed: %w",
ErrorAPI{
status: res.StatusCode,
body: buff.String(),
},
)
case http.StatusUnauthorized:
return errUnauthorizedHeartbeat
case http.StatusNoContent:
}
return nil
}
func baseURL() (string, error) {
// This doesn't connect, we just need the connection object.
c, err := net.Dial("udp", "255.255.255.255:53")
if err != nil {
return "", err
}
defer c.Close()
a := c.LocalAddr().(*net.UDPAddr)
h, _, err := net.SplitHostPort(a.String())
if err != nil {
return "", err
}
return h, nil
}
var sumoAppProcesses = map[string]string{
"apache": "apache",
"apache2": "apache",
"httpd": "apache",
"docker": "docker", // docker cli
"elasticsearch": "elasticsearch",
"mysql-server": "mysql",
"mysqld": "mysql",
"nginx": "nginx",
"postgresql": "postgres",
"postgresql-9.5": "postgres",
"rabbitmq-server": "rabbitmq",
"redis": "redis",
"tomcat": "tomcat",
"kafka-server-start.sh": "kafka", // Need to test this, most common shell wrapper.
"redis-server": "redis",
"mongod": "mongodb",
"cassandra": "cassandra",
"jmx": "jmx",
"activemq": "activemq",
"memcached": "memcached",
"haproxy": "haproxy",
"dockerd": "docker-ce", // docker engine, for when process runs natively
"com.docker.backend": "docker-ce", // docker daemon runs on a VM in Docker Desktop, process doesn't show on mac
"sqlservr": "mssql", // linux SQL Server process
}
func (se *SumologicExtension) filteredProcessList() ([]string, error) {
var pl []string
processes, err := process.Processes()
if err != nil {
return pl, err
}
for _, v := range processes {
e, err := v.Name()
if err != nil {
if runtime.GOOS == "windows" {
// On Windows, if we can't get a process name, it is likely a zombie process, assume that and skip them.
se.logger.Warn(
"Failed to get executable name, it is likely a zombie process, skipping it",
zap.Int32("pid", v.Pid),
zap.Error(err))
continue
}
return nil, fmt.Errorf("Error getting executable name: %w", err)
}
e = strings.ToLower(e)
if a, i := sumoAppProcesses[e]; i {
pl = append(pl, a)
}
// handling for Docker Desktop
if e == dockerDesktopJavaProcess {
pl = append(pl, "docker-ce")
}
// handling Java background processes
if e == "java" {
cmdline, err := v.Cmdline()
if err != nil {
return nil, fmt.Errorf("error getting executable name for PID %d: %w", v.Pid, err)
}
switch {
case strings.Contains(cmdline, cassandraJavaProcess):
pl = append(pl, "cassandra")
case strings.Contains(cmdline, jmxJavaProcess):
pl = append(pl, "jmx")
case strings.Contains(cmdline, activeMQJavaProcess):
pl = append(pl, "activemq")
}
}
}
return pl, nil
}
func (se *SumologicExtension) discoverTags() (map[string]any, error) {
t := map[string]any{
"sumo.disco.enabled": "true",
}
pl, err := se.filteredProcessList()
if err != nil {
return t, err
}
for _, v := range pl {
t["sumo.disco."+v] = 1 // Sumo does not allow empty tag values, let's set it to anything.
}
return t, nil
}
func (se *SumologicExtension) updateMetadataWithHTTPClient(ctx context.Context, httpClient *http.Client) error {
u, err := url.Parse(se.BaseURL() + metadataURL)
if err != nil {
return fmt.Errorf("unable to parse metadata URL %w", err)
}
info, err := host.Info()
if err != nil {
return err
}
hostname, err := getHostname(se.logger)
if err != nil {
return err
}
ip, err := baseURL()
if err != nil {
return err
}
td := map[string]any{}
if se.conf.DiscoverCollectorTags {
td, err = se.discoverTags()
if err != nil {
return err
}
}
for k, v := range se.conf.CollectorFields {
td[k] = v
}
var buff bytes.Buffer
if err = json.NewEncoder(&buff).Encode(api.OpenMetadataRequestPayload{
HostDetails: api.OpenMetadataHostDetails{
Name: hostname,
OsName: info.OS,
OsVersion: info.PlatformVersion,
Environment: se.conf.CollectorEnvironment,
},
CollectorDetails: api.OpenMetadataCollectorDetails{
RunningVersion: cleanupBuildVersion(se.buildVersion),
},
NetworkDetails: api.OpenMetadataNetworkDetails{
HostIPAddress: ip,
},
TagDetails: td,
}); err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), &buff)
if err != nil {
return fmt.Errorf("unable to create HTTP request %w", err)
}
addJSONHeaders(req)
se.logger.Info("Updating collector metadata",
zap.String("URL", u.String()),
zap.String("body", buff.String()))
res, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("unable to send HTTP request: %w", err)
}
defer res.Body.Close()
switch res.StatusCode {
default:
var buff bytes.Buffer
if _, err := io.Copy(&buff, res.Body); err != nil {
return fmt.Errorf(
"failed to copy collector metadata response body, status code: %d, err: %w",
res.StatusCode, err,
)
}
se.logger.Warn("Metadata API error response",
zap.Int("status", res.StatusCode),
zap.String("body", buff.String()))
return fmt.Errorf("collector metadata request failed: %w",
ErrorAPI{
status: res.StatusCode,
body: buff.String(),
},
)
case http.StatusUnauthorized:
return errUnauthorizedMetadata
case http.StatusNoContent:
case http.StatusOK:
}
return nil
}
func (se *SumologicExtension) updateMetadataWithBackoff(ctx context.Context) error {
se.backOff.Reset()
for {
err := se.updateMetadataWithHTTPClient(ctx, se.httpClient)
if err == nil {
return nil
}
se.logger.Warn(fmt.Sprintf("collector metadata update failed: %s", err))
nbo := se.backOff.NextBackOff()
var backOffErr *backoff.PermanentError
// Return error if backoff reaches the limit or uncoverable error is spotted
if ok := errors.As(err, &backOffErr); nbo == se.backOff.Stop || ok {
return fmt.Errorf("collector metadata update failed: %w", err)
}
t := time.NewTimer(nbo)
defer t.Stop()
select {
case <-t.C:
case <-ctx.Done():
return fmt.Errorf("collector metadata update cancelled: %w", ctx.Err())
}
}
}
func (se *SumologicExtension) ComponentID() component.ID {
return se.id
}
func (se *SumologicExtension) CollectorID() string {
return se.registrationInfo.CollectorID
}
func (se *SumologicExtension) BaseURL() string {
se.baseURLLock.RLock()
defer se.baseURLLock.RUnlock()
return se.baseURL
}
func (se *SumologicExtension) SetBaseURL(baseURL string) {
se.baseURLLock.Lock()
se.baseURL = baseURL
se.baseURLLock.Unlock()
}
func (se *SumologicExtension) StickySessionCookie() string {
se.stickySessionCookieLock.RLock()
defer se.stickySessionCookieLock.RUnlock()
return se.stickySessionCookie
}
func (se *SumologicExtension) SetStickySessionCookie(stickySessionCookie string) {
se.stickySessionCookieLock.Lock()
se.stickySessionCookie = stickySessionCookie
se.stickySessionCookieLock.Unlock()
}
// WatchCredentialKey watches for credential key updates. It makes use of a
// channel close (done by injectCredentials) and string comparison with a
// known/previous credential key (old). This function allows components to be
// proactive when dealing with changes to authentication.
func (se *SumologicExtension) WatchCredentialKey(ctx context.Context, old string) string {
se.credsNotifyLock.Lock()
v, ch := se.registrationInfo.CollectorCredentialKey, se.credsNotifyUpdate
se.credsNotifyLock.Unlock()
for v == old {
select {
case <-ctx.Done():
return v
case <-ch:
se.credsNotifyLock.Lock()
v, ch = se.registrationInfo.CollectorCredentialKey, se.credsNotifyUpdate
se.credsNotifyLock.Unlock()
}
}
return v
}
// CreateCredentialsHeader produces an HTTP header containing authentication
// credentials. This function is for components that do not make use of the
// RoundTripper or have an HTTP request to build upon.
func (se *SumologicExtension) CreateCredentialsHeader() (http.Header, error) {
id, key := se.registrationInfo.CollectorCredentialID, se.registrationInfo.CollectorCredentialKey
if id == "" || key == "" {
return nil, errors.New("collector credentials are not set")
}
token := base64.StdEncoding.EncodeToString(
[]byte(id + ":" + key),
)
header := http.Header{}
header.Set("Authorization", "Basic "+token)
return header, nil
}
// Implement [1] in order for this extension to be used as custom exporter
// authenticator.
//
// [1]: https://github.com/open-telemetry/opentelemetry-collector/blob/2e84285efc665798d76773b9901727e8836e9d8f/config/configauth/clientauth.go#L34-L39
func (se *SumologicExtension) RoundTripper(base http.RoundTripper) (http.RoundTripper, error) {
return roundTripper{
collectorCredentialID: se.registrationInfo.CollectorCredentialID,
collectorCredentialKey: se.registrationInfo.CollectorCredentialKey,
addStickySessionCookie: se.addStickySessionCookie,
updateStickySessionCookie: se.updateStickySessionCookie,
base: base,
}, nil
}
func (se *SumologicExtension) addStickySessionCookie(req *http.Request) {
if !se.conf.StickySessionEnabled {
return
}
currentCookieValue := se.StickySessionCookie()
if currentCookieValue != "" {
cookie := &http.Cookie{
Name: stickySessionKey,
Value: currentCookieValue,
}
req.AddCookie(cookie)
}
}
func (se *SumologicExtension) updateStickySessionCookie(resp *http.Response) {
cookies := resp.Cookies()
if se.conf.StickySessionEnabled && len(cookies) > 0 {
for _, cookie := range cookies {
if cookie.Name == stickySessionKey {
if cookie.Value != se.StickySessionCookie() {
se.SetStickySessionCookie(cookie.Value)
}
return
}
}
}
}
type roundTripper struct {
collectorCredentialID string
collectorCredentialKey string
addStickySessionCookie func(*http.Request)
updateStickySessionCookie func(*http.Response)
base http.RoundTripper
}
func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
addCollectorCredentials(req, rt.collectorCredentialID, rt.collectorCredentialKey)
rt.addStickySessionCookie(req)
resp, err := rt.base.RoundTrip(req)
if err != nil {
return nil, err
}
rt.updateStickySessionCookie(resp)
return resp, err
}
func addCollectorCredentials(req *http.Request, collectorCredentialID string, collectorCredentialKey string) {
token := base64.StdEncoding.EncodeToString(
[]byte(collectorCredentialID + ":" + collectorCredentialKey),
)
// Delete the existing Authorization header so prevent sending both the old one
// and the new one.
req.Header.Del("Authorization")
req.Header.Add("Authorization", "Basic "+token)
}
func addClientCredentials(req *http.Request, credentials accessCredentials) {
var authHeaderValue string
if credentials.InstallationToken != "" {
authHeaderValue = fmt.Sprintf("Bearer %s", string(credentials.InstallationToken))
}
req.Header.Del("Authorization")
req.Header.Add("Authorization", authHeaderValue)
}
// TODO(ck): hostname allows the darwin tests to bypass fqdn.
var hostname = fqdn.FqdnHostname
// getHostname returns the host name consistently with the resource detection processor's defaults
// TODO: try to dynamically extract this from the resource processor in the pipeline
func getHostname(logger *zap.Logger) (string, error) {
fqdnHostname, err := hostname()
if err == nil {
return fqdnHostname, nil
}
logger.Debug("failed to get fqdn", zap.Error(err))
return os.Hostname()
}
// cleanupBuildVersion adds a leading 'v' and removes the tailing build hash to make sure the
// backend understand the build number. Note that only version strings with the following format will be
// cleaned up. All other version formats will remain the same.
// Cleaned up format: 0.108.0-sumo-2-4d57200692d5c5c39effad4ae3b29fef79209113
func cleanupBuildVersion(version string) string {
pattern := "^v?([0-9]+\\.[0-9]+\\.[0-9]+-sumo-[0-9]+)(-[0-9a-f]{40}){0,1}(-fips){0,1}$"
re := regexp.MustCompile(pattern)
matches := re.FindAllStringSubmatch(version, 1)
if len(matches) != 1 {
return version
}
subMatches := matches[0]
if len(subMatches) > 1 {
ver := subMatches[1]
if len(subMatches) == 4 {
ver += subMatches[3]
}
return "v" + ver
}
return version
}