public void initialize()

in src/main/java/com/amazonaws/services/kinesis/aggregators/StreamAggregator.java [258:408]


	public void initialize(String shardId) throws Exception {
		// Set System properties to allow entity expansion of unlimited items in
		// response documents from AWS API
		//
		// see https://blogs.oracle.com/joew/entry/jdk_7u45_aws_issue_123 for
		// more information
		System.setProperty("entityExpansionLimit", "0");
		System.setProperty("jdk.xml.entityExpansionLimit", "0");

		this.shardId = shardId;

		// establish we are running on the lowest shard on the basis of hash
		// range
		AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
				this.config.getKinesisCredentialsProvider());
		if (this.config.getRegionName() != null) {
			region = Region.getRegion(Regions.fromName(this.config
					.getRegionName()));
			kinesisClient.setRegion(region);
		}

		try {
			if (this.shardId.equals(StreamAggregatorUtils.getFirstShardName(
					kinesisClient, this.config.getStreamName()))) {
				this.isFirstShardWorker = true;
				logInfo("Aggregator taking Primary Thread Responsibility");
			}
		} catch (Exception e) {
			logWarn("Unable to establish if Worker Thread is Primary");
		}

		validateConfig();

		// set the default aggregator type
		if (this.aggregatorType == null) {
			this.aggregatorType = AggregatorType.COUNT;
		}

		if (this.dataExtractor == null)
			throw new InvalidConfigurationException(
					"Unable to create Aggregator Instance without a configured IDataStore");

		// set the aggregator type on the data extractor
		this.dataExtractor.setAggregatorType(this.aggregatorType);
		this.dataExtractor.validate();

		// create connections to dynamo and kinesis
		ClientConfiguration clientConfig = new ClientConfiguration()
				.withSocketTimeout(60000);
		this.dynamoClient = new AmazonDynamoDBAsyncClient(
				this.config.getDynamoDBCredentialsProvider(), clientConfig);
		if (region != null)
			this.dynamoClient.setRegion(region);

		this.kinesisClient = new AmazonKinesisClient(
				this.config.getKinesisCredentialsProvider());
		if (region != null)
			this.kinesisClient.setRegion(region);

		inventory = new InventoryModel(this.dynamoClient);

		// get the latest sequence number checkpointed for this named aggregator
		// on this shard
		InventoryStatus lastUpdate = inventory.getLastUpdate(this.streamName,
				this.applicationName, this.namespace, this.shardId);
		if (lastUpdate != null && lastUpdate.getHighSeq() != null) {
			// set the current high sequence to the last high sequence
			this.highSeq = new BigInteger(lastUpdate.getHighSeq());
		}

		// log that we are now starting up
		inventory.update(this.streamName, this.applicationName, this.namespace,
				this.shardId, null, null, System.currentTimeMillis(),
				InventoryModel.STATE.STARTING);

		// set the table name we will use for aggregated values
		if (this.tableName == null) {
			this.tableName = StreamAggregatorUtils.getTableName(
					config.getApplicationName(), this.getNamespace());
		}

		if (this.environment != null && !this.environment.equals(""))
			this.tableName = String.format("%s.%s", this.environment,
					this.tableName);

		// resolve the basic data being aggregated
		String labelColumn = StreamAggregatorUtils.methodToColumn(dataExtractor
				.getAggregateLabelName());
		String dateColumn = dataExtractor.getDateValueName() == null ? DEFAULT_DATE_VALUE
				: dataExtractor.getDateValueName();

		// configure the default dynamo data store
		if (this.dataStore == null) {
			this.dataStore = new DynamoDataStore(this.dynamoClient,
					this.kinesisClient, this.aggregatorType, this.streamName,
					this.tableName, labelColumn, dateColumn)
					.withStorageCapacity(this.readCapacity, this.writeCapacity);
			this.dataStore.setRegion(region);
		}
		this.dataStore.initialise();

		// configure the cache so it can do its work
		cache = new AggregateCache(this.shardId)
				.withCredentials(this.config.getKinesisCredentialsProvider())
				.withAggregateType(this.aggregatorType)
				.withTableName(this.tableName).withLabelColumn(labelColumn)
				.withDateColumn(dateColumn).withDataStore(this.dataStore);

		// create a cloudwatch client for the cache to publish against if needed
		if (this.publishMetrics && this.metricsEmitter == null) {
			this.metricsEmitter = new CloudWatchMetricsEmitter(this.tableName,
					this.config.getCloudWatchCredentialsProvider());
		}

		if (this.metricsEmitter != null) {
			if (this.config.getRegionName() != null)
				this.metricsEmitter.setRegion(region);
		}
		// add the metrics publisher to the cache if we are bound to the lowest
		// shard
		if (this.metricsEmitter != null) {
			cache.withMetricsEmitter(this.metricsEmitter);
		}
		cache.initialise();

		// set the user agent
		StringBuilder userAgent = new StringBuilder(
				ClientConfiguration.DEFAULT_USER_AGENT);
		userAgent.append(" ");
		userAgent.append(this.AWSApplication);
		userAgent.append("/");
		userAgent.append(this.version);
		this.config.getKinesisClientConfiguration().setUserAgent(
				userAgent.toString());

		// log startup state
		StringBuffer sb = new StringBuffer();
		for (TimeHorizon t : timeHorizons) {
			sb.append(String.format("%s,", t.name()));
		}
		sb.deleteCharAt(sb.length() - 1);

		logInfo(String
				.format("Amazon Kinesis Stream Aggregator Online\nStream: %s\nApplication: %s\nNamespace: %s\nWorker: %s\nGranularity: %s\nContent Extracted With: %s",
						streamName, applicationName, this.namespace,
						this.config.getWorkerIdentifier(), sb.toString(),
						dataExtractor.getClass().getName()));
		if (this.highSeq != null)
			logInfo(String.format("Processing Data from Seq: %s", this.highSeq));
		online = true;
	}