func()

in plugins/outputs/awscsm/awscsm.go [114:212]


func (c *CSM) Connect() error {
	c.logger = newLogger(c.LogLevel)

	credentialConfig := &configaws.CredentialConfig{
		Region:    c.Region,
		AccessKey: c.AccessKey,
		SecretKey: c.SecretKey,
		RoleARN:   c.RoleARN,
		Profile:   c.Profile,
		Filename:  c.Filename,
		Token:     c.Token,
	}

	configProvider := credentialConfig.Credentials()

	metadataClient := ec2metadata.New(configProvider)
	instanceMetadata, err := metadataClient.GetInstanceIdentityDocument()
	region := c.Region

	if err == nil {
		c.instanceMetadata = &instanceMetadata
		if region == "" {
			region = c.instanceMetadata.Region
		}
		c.logger.Log("EC2Metadata found")
	}

	c.logger.Log("Using region " + region)

	credentialConfig.Region = region

	c.queueCh = models.AwsCsmOutputChannel
	c.publishingOffset = time.Duration(rand.Int63n(int64(60 * time.Second)))

	commonCreds := credentialConfig.Credentials()
	commonCfg := commonCreds.ClientConfig(csm.ServiceName, &aws.Config{
		LogLevel: configaws.SDKLogLevel(),
		Logger:   configaws.SDKLogger{},
	})

	if c.LogLevel > 0 {
		commonCfg.Config.LogLevel = aws.LogLevel(aws.LogDebugWithHTTPBody)
	}

	commonCfg.Config.Logger = aws.LoggerFunc(func(args ...interface{}) {
		log.Println(args...)
	})

	commonSession := session.New(commonCfg.Config)

	endpoint := fmt.Sprintf("https://control.sdkmetrics.%s.amazonaws.com", region)
	if len(c.EndpointOverride) > 0 {
		endpoint = c.EndpointOverride
	}

	controlPlaneConfigOverride := aws.Config{
		Endpoint: aws.String(endpoint),
		LogLevel: configaws.SDKLogLevel(),
		Logger:   configaws.SDKLogger{},
	}

	controlplane := csm.New(commonSession, &controlPlaneConfigOverride)
	//TODO: work on this when we have a proper versioning mechanism
	//TODO: we need to find a way to expose enabled plugins
	//TODO: custom metrics adoption rate detection and be able to monitor any plugin enable rate
	userAgent := fmt.Sprintf("%s/%s (%s; %s; %s) %s", "CWAgent/CSM", "1.0", runtime.Version(), runtime.GOOS, runtime.GOARCH, "list of enabled input/output plugins")
	controlplane.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", userAgent))

	providers.Config = providers.NewCSMConfigProvider(controlplane, providers.DefaultInterval)
	c.sendRecordLimit = providers.Config.RetrieveAgentConfig().Limits.MaxRecords

	os := runtime.GOOS
	env := csm.HostEnvironment{
		Os: &os,
	}

	if c.instanceMetadata != nil {
		env.AvailabilityZone = &c.instanceMetadata.AvailabilityZone
		env.InstanceId = &c.instanceMetadata.InstanceID
		env.Properties = []*string{aws.String(sdkmetricsdataplane.EnvironmentPropertyTagEc2)}
	}

	writer := NewCSMWriter(controlplane, env)
	metametrics.MetricListener = metametrics.NewListenerAndStart(writer, 1000, 5*time.Minute)

	dataPlaneConfigOverride := aws.Config{
		MaxRetries: aws.Int(0),
		LogLevel:   configaws.SDKLogLevel(),
		Logger:     configaws.SDKLogger{},
	}

	dataplaneClient := sdkmetricsdataplane.New(commonSession, &dataPlaneConfigOverride)
	dataplaneClient.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", userAgent))
	c.dataplaneClient = dataplaneClient

	go c.publishJob()

	return nil
}