in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/proxy/KinesisProxy.java [146:210]
protected KinesisProxy(Properties configProps) {
checkNotNull(configProps);
KinesisConfigUtil.backfillConsumerKeys(configProps);
this.kinesisClient = createKinesisClient(configProps);
this.listShardsBaseBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_BASE)));
this.listShardsMaxBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_MAX)));
this.listShardsExpConstant = Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.listShardsMaxRetries = Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
this.describeStreamBaseBackoffMillis = Long.parseLong(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
this.describeStreamMaxBackoffMillis = Long.parseLong(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
this.describeStreamExpConstant = Double.parseDouble(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getRecordsBaseBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
this.getRecordsMaxBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
this.getRecordsExpConstant = Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getRecordsMaxRetries = Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
this.getShardIteratorBaseBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
this.getShardIteratorMaxBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
this.getShardIteratorExpConstant = Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getShardIteratorMaxRetries = Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
}