in src/main/java/com/amazonaws/services/kinesis/aggregators/consumer/AggregatorConsumer.java [132:192]
public void configure() throws Exception {
if (!isConfigured) {
validateConfig();
if (this.positionInStream != null) {
streamPosition = InitialPositionInStream
.valueOf(this.positionInStream);
} else {
streamPosition = InitialPositionInStream.LATEST;
}
// append the environment name to the application name
if (environmentName != null) {
appName = String.format("%s-%s", appName, environmentName);
}
// ensure the JVM will refresh the cached IP values of AWS resources
// (e.g. service endpoints).
java.security.Security
.setProperty("networkaddress.cache.ttl", "60");
String workerId = NetworkInterface.getNetworkInterfaces() + ":"
+ UUID.randomUUID();
LOG.info("Using Worker ID: " + workerId);
// obtain credentials using the default provider chain or the
// credentials provider supplied
AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
: this.credentialsProvider;
LOG.info("Using credentials with Access Key ID: "
+ credentialsProvider.getCredentials().getAWSAccessKeyId());
config = new KinesisClientLibConfiguration(appName, streamName,
credentialsProvider, workerId).withInitialPositionInStream(
streamPosition).withKinesisEndpoint(kinesisEndpoint);
config.getKinesisClientConfiguration().setUserAgent(
StreamAggregator.AWSApplication);
if (regionName != null) {
Region region = Region.getRegion(Regions.fromName(regionName));
config.withRegionName(region.getName());
}
if (maxRecords != -1)
config.withMaxRecords(maxRecords);
// initialise the Aggregators
aggGroup = buildAggregatorsFromConfig();
LOG.info(String
.format("Amazon Kinesis Aggregators Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
config.getApplicationName(),
config.getStreamName(), config.getRegionName(),
config.getWorkerIdentifier(),
config.getMaxRecords()));
isConfigured = true;
}
}