in src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java [343:383]
private KinesisProducer getKinesisProducer() {
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion(regionName);
config.setCredentialsProvider(IAMUtility.createCredentials(regionName, roleARN, roleExternalID, roleSessionName, roleDurationSeconds));
config.setMaxConnections(maxConnections);
if (!StringUtils.isNullOrEmpty(kinesisEndpoint))
config.setKinesisEndpoint(kinesisEndpoint);
config.setAggregationEnabled(aggregation);
// Limits the maximum allowed put rate for a shard, as a percentage of
// the
// backend limits.
config.setRateLimit(rateLimit);
// Maximum amount of time (milliseconds) a record may spend being
// buffered
// before it gets sent. Records may be sent sooner than this depending
// on the
// other buffering limits
config.setRecordMaxBufferedTime(maxBufferedTime);
// Set a time-to-live on records (milliseconds). Records that do not get
// successfully put within the limit are failed.
config.setRecordTtl(ttl);
// Controls the number of metrics that are uploaded to CloudWatch.
// Expected pattern: none|summary|detailed
config.setMetricsLevel(metricsLevel);
// Controls the granularity of metrics that are uploaded to CloudWatch.
// Greater granularity produces more metrics.
// Expected pattern: global|stream|shard
config.setMetricsGranularity(metricsGranuality);
// The namespace to upload metrics under.
config.setMetricsNamespace(metricsNameSpace);
return new KinesisProducer(config);
}