func newPutRecordsClient()

in kinesis/kinesis.go [174:233]


func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint string, stsEndpoint string, pluginID int) (*kinesis.Kinesis, error) {
	customResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
		if service == endpoints.KinesisServiceID && kinesisEndpoint != "" {
			return endpoints.ResolvedEndpoint{
				URL: kinesisEndpoint,
			}, nil
		} else if service == endpoints.StsServiceID && stsEndpoint != "" {
			return endpoints.ResolvedEndpoint{
				URL: stsEndpoint,
			}, nil
		}
		return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
	}

	// Fetch base credentials
	baseConfig := &aws.Config{
		Region:                        aws.String(awsRegion),
		EndpointResolver:              endpoints.ResolverFunc(customResolverFn),
		CredentialsChainVerboseErrors: aws.Bool(true),
	}

	sess, err := session.NewSession(baseConfig)
	if err != nil {
		return nil, err
	}

	var svcSess = sess
	var svcConfig = baseConfig
	eksRole := os.Getenv("EKS_POD_EXECUTION_ROLE")
	if eksRole != "" {
		logrus.Debugf("[kinesis %d] Fetching EKS pod credentials.\n", pluginID)
		eksConfig := &aws.Config{}
		creds := stscreds.NewCredentials(svcSess, eksRole)
		eksConfig.Credentials = creds
		eksConfig.Region = aws.String(awsRegion)
		svcConfig = eksConfig

		svcSess, err = session.NewSession(svcConfig)
		if err != nil {
			return nil, err
		}
	}
	if roleARN != "" {
		logrus.Debugf("[kinesis %d] Fetching credentials for %s\n", pluginID, roleARN)
		stsConfig := &aws.Config{}
		creds := stscreds.NewCredentials(svcSess, roleARN)
		stsConfig.Credentials = creds
		stsConfig.Region = aws.String(awsRegion)
		svcConfig = stsConfig

		svcSess, err = session.NewSession(svcConfig)
		if err != nil {
			return nil, err
		}
	}

	client := kinesis.New(svcSess, svcConfig)
	client.Handlers.Build.PushBackNamed(plugins.CustomUserAgentHandler())
	return client, nil
}