in src/main/java/com/amazonaws/services/kinesis/aggregators/StreamAggregator.java [258:408]
public void initialize(String shardId) throws Exception {
// Set System properties to allow entity expansion of unlimited items in
// response documents from AWS API
//
// see https://blogs.oracle.com/joew/entry/jdk_7u45_aws_issue_123 for
// more information
System.setProperty("entityExpansionLimit", "0");
System.setProperty("jdk.xml.entityExpansionLimit", "0");
this.shardId = shardId;
// establish we are running on the lowest shard on the basis of hash
// range
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
this.config.getKinesisCredentialsProvider());
if (this.config.getRegionName() != null) {
region = Region.getRegion(Regions.fromName(this.config
.getRegionName()));
kinesisClient.setRegion(region);
}
try {
if (this.shardId.equals(StreamAggregatorUtils.getFirstShardName(
kinesisClient, this.config.getStreamName()))) {
this.isFirstShardWorker = true;
logInfo("Aggregator taking Primary Thread Responsibility");
}
} catch (Exception e) {
logWarn("Unable to establish if Worker Thread is Primary");
}
validateConfig();
// set the default aggregator type
if (this.aggregatorType == null) {
this.aggregatorType = AggregatorType.COUNT;
}
if (this.dataExtractor == null)
throw new InvalidConfigurationException(
"Unable to create Aggregator Instance without a configured IDataStore");
// set the aggregator type on the data extractor
this.dataExtractor.setAggregatorType(this.aggregatorType);
this.dataExtractor.validate();
// create connections to dynamo and kinesis
ClientConfiguration clientConfig = new ClientConfiguration()
.withSocketTimeout(60000);
this.dynamoClient = new AmazonDynamoDBAsyncClient(
this.config.getDynamoDBCredentialsProvider(), clientConfig);
if (region != null)
this.dynamoClient.setRegion(region);
this.kinesisClient = new AmazonKinesisClient(
this.config.getKinesisCredentialsProvider());
if (region != null)
this.kinesisClient.setRegion(region);
inventory = new InventoryModel(this.dynamoClient);
// get the latest sequence number checkpointed for this named aggregator
// on this shard
InventoryStatus lastUpdate = inventory.getLastUpdate(this.streamName,
this.applicationName, this.namespace, this.shardId);
if (lastUpdate != null && lastUpdate.getHighSeq() != null) {
// set the current high sequence to the last high sequence
this.highSeq = new BigInteger(lastUpdate.getHighSeq());
}
// log that we are now starting up
inventory.update(this.streamName, this.applicationName, this.namespace,
this.shardId, null, null, System.currentTimeMillis(),
InventoryModel.STATE.STARTING);
// set the table name we will use for aggregated values
if (this.tableName == null) {
this.tableName = StreamAggregatorUtils.getTableName(
config.getApplicationName(), this.getNamespace());
}
if (this.environment != null && !this.environment.equals(""))
this.tableName = String.format("%s.%s", this.environment,
this.tableName);
// resolve the basic data being aggregated
String labelColumn = StreamAggregatorUtils.methodToColumn(dataExtractor
.getAggregateLabelName());
String dateColumn = dataExtractor.getDateValueName() == null ? DEFAULT_DATE_VALUE
: dataExtractor.getDateValueName();
// configure the default dynamo data store
if (this.dataStore == null) {
this.dataStore = new DynamoDataStore(this.dynamoClient,
this.kinesisClient, this.aggregatorType, this.streamName,
this.tableName, labelColumn, dateColumn)
.withStorageCapacity(this.readCapacity, this.writeCapacity);
this.dataStore.setRegion(region);
}
this.dataStore.initialise();
// configure the cache so it can do its work
cache = new AggregateCache(this.shardId)
.withCredentials(this.config.getKinesisCredentialsProvider())
.withAggregateType(this.aggregatorType)
.withTableName(this.tableName).withLabelColumn(labelColumn)
.withDateColumn(dateColumn).withDataStore(this.dataStore);
// create a cloudwatch client for the cache to publish against if needed
if (this.publishMetrics && this.metricsEmitter == null) {
this.metricsEmitter = new CloudWatchMetricsEmitter(this.tableName,
this.config.getCloudWatchCredentialsProvider());
}
if (this.metricsEmitter != null) {
if (this.config.getRegionName() != null)
this.metricsEmitter.setRegion(region);
}
// add the metrics publisher to the cache if we are bound to the lowest
// shard
if (this.metricsEmitter != null) {
cache.withMetricsEmitter(this.metricsEmitter);
}
cache.initialise();
// set the user agent
StringBuilder userAgent = new StringBuilder(
ClientConfiguration.DEFAULT_USER_AGENT);
userAgent.append(" ");
userAgent.append(this.AWSApplication);
userAgent.append("/");
userAgent.append(this.version);
this.config.getKinesisClientConfiguration().setUserAgent(
userAgent.toString());
// log startup state
StringBuffer sb = new StringBuffer();
for (TimeHorizon t : timeHorizons) {
sb.append(String.format("%s,", t.name()));
}
sb.deleteCharAt(sb.length() - 1);
logInfo(String
.format("Amazon Kinesis Stream Aggregator Online\nStream: %s\nApplication: %s\nNamespace: %s\nWorker: %s\nGranularity: %s\nContent Extracted With: %s",
streamName, applicationName, this.namespace,
this.config.getWorkerIdentifier(), sb.toString(),
dataExtractor.getClass().getName()));
if (this.highSeq != null)
logInfo(String.format("Processing Data from Seq: %s", this.highSeq));
online = true;
}