func newPutRecordBatcher()

in firehose/firehose.go [119:178]


func newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint string, pluginID int) (*firehose.Firehose, error) {
	customResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
		if service == endpoints.FirehoseServiceID && firehoseEndpoint != "" {
			return endpoints.ResolvedEndpoint{
				URL: firehoseEndpoint,
			}, nil
		} else if service == endpoints.StsServiceID && stsEndpoint != "" {
			return endpoints.ResolvedEndpoint{
				URL: stsEndpoint,
			}, nil
		}
		return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
	}

	// Fetching base credentials
	baseConfig := &aws.Config{
		Region:                        aws.String(region),
		EndpointResolver:              endpoints.ResolverFunc(customResolverFn),
		CredentialsChainVerboseErrors: aws.Bool(true),
	}

	sess, err := session.NewSession(baseConfig)
	if err != nil {
		return nil, err
	}

	var svcSess = sess
	var svcConfig = baseConfig
	eksRole := os.Getenv("EKS_POD_EXECUTION_ROLE")
	if eksRole != "" {
		logrus.Debugf("[firehose %d] Fetching EKS pod credentials.\n", pluginID)
		eksConfig := &aws.Config{}
		creds := stscreds.NewCredentials(svcSess, eksRole)
		eksConfig.Credentials = creds
		eksConfig.Region = aws.String(region)
		svcConfig = eksConfig

		svcSess, err = session.NewSession(svcConfig)
		if err != nil {
			return nil, err
		}
	}
	if roleARN != "" {
		logrus.Debugf("[firehose %d] Fetching credentials for %s\n", pluginID, roleARN)
		stsConfig := &aws.Config{}
		creds := stscreds.NewCredentials(svcSess, roleARN)
		stsConfig.Credentials = creds
		stsConfig.Region = aws.String(region)
		svcConfig = stsConfig

		svcSess, err = session.NewSession(svcConfig)
		if err != nil {
			return nil, err
		}
	}

	client := firehose.New(svcSess, svcConfig)
	client.Handlers.Build.PushBackNamed(plugins.CustomUserAgentHandler())
	return client, nil
}