in aws-blog-kinesis-beanstalk-workers/src/main/java/com/amazonaws/services/kinesis/ManagedConsumer.java [94:156]
public void configure() throws Exception {
if (!isConfigured) {
validateConfig();
try {
String userAgent = "AWSKinesisManagedConsumer/" + this.version;
if (this.positionInStream != null) {
streamPosition = InitialPositionInStream.valueOf(this.positionInStream);
} else {
streamPosition = InitialPositionInStream.LATEST;
}
// append the environment name to the application name
if (environmentName != null) {
appName = String.format("%s-%s", appName, environmentName);
}
// ensure the JVM will refresh the cached IP values of AWS
// resources
// (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID();
LOG.info("Using Worker ID: " + workerId);
// obtain credentials using the default provider chain or the
// credentials provider supplied
AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
: this.credentialsProvider;
LOG.info("Using credentials with Access Key ID: "
+ credentialsProvider.getCredentials().getAWSAccessKeyId());
config = new KinesisClientLibConfiguration(appName, streamName,
credentialsProvider, workerId).withInitialPositionInStream(streamPosition).withKinesisEndpoint(
kinesisEndpoint);
config.getKinesisClientConfiguration().setUserAgent(userAgent);
if (regionName != null) {
Region region = Region.getRegion(Regions.fromName(regionName));
config.withRegionName(region.getName());
}
if (this.maxRecords != -1)
config.withMaxRecords(maxRecords);
if (this.positionInStream != null)
config.withInitialPositionInStream(InitialPositionInStream.valueOf(this.positionInStream));
LOG.info(String.format(
"Amazon Kinesis Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
config.getApplicationName(), config.getStreamName(),
config.getRegionName(), config.getWorkerIdentifier(),
config.getMaxRecords()));
isConfigured = true;
} catch (Exception e) {
throw new InvalidConfigurationException(e);
}
}
}