in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [152:263]
protected KinesisProxy(Properties configProps) {
checkNotNull(configProps);
KinesisConfigUtil.backfillConsumerKeys(configProps);
this.kinesisClient = createKinesisClient(configProps);
this.listShardsBaseBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
Long.toString(
ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_BASE)));
this.listShardsMaxBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
Long.toString(
ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_MAX)));
this.listShardsExpConstant =
Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(
ConsumerConfigConstants
.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.listShardsMaxRetries =
Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_RETRIES,
Long.toString(
ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
this.describeStreamBaseBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
Long.toString(
ConsumerConfigConstants
.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
this.describeStreamMaxBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
Long.toString(
ConsumerConfigConstants
.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
this.describeStreamExpConstant =
Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants
.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(
ConsumerConfigConstants
.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getRecordsBaseBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
Long.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
this.getRecordsMaxBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
Long.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
this.getRecordsExpConstant =
Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants
.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getRecordsMaxRetries =
Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
Long.toString(
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
this.getShardIteratorBaseBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
Long.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
this.getShardIteratorMaxBackoffMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
Long.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
this.getShardIteratorExpConstant =
Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants
.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getShardIteratorMaxRetries =
Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
Long.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_GETITERATOR_RETRIES)));
}