in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java [183:289]
public FanOutRecordPublisherConfiguration(final Properties configProps, final List<String> streams) {
Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
}
for (String stream : streams) {
String key = efoConsumerArn(stream);
if (configProps.containsKey(key)) {
streamConsumerArns.put(stream, configProps.getProperty(key));
}
}
this.subscribeToShardMaxRetries = Optional
.ofNullable(configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES);
this.subscribeToShardTimeout = Optional
.ofNullable(configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
this.subscribeToShardBaseBackoffMillis = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE);
this.subscribeToShardMaxBackoffMillis = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX);
this.subscribeToShardExpConstant = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT);
this.registerStreamBaseBackoffMillis = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE);
this.registerStreamMaxBackoffMillis = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX);
this.registerStreamExpConstant = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
this.registerStreamMaxRetries = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_RETRIES))
.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES);
this.registerStreamConsumerTimeout = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_TIMEOUT);
this.deregisterStreamBaseBackoffMillis = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE);
this.deregisterStreamMaxBackoffMillis = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX);
this.deregisterStreamExpConstant = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
this.deregisterStreamMaxRetries = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES))
.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES);
this.deregisterStreamConsumerTimeout = Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_TIMEOUT_SECONDS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_TIMEOUT);
this.describeStreamMaxRetries = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_RETRIES))
.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRIES);
this.describeStreamBaseBackoffMillis = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE);
this.describeStreamMaxBackoffMillis = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX);
this.describeStreamExpConstant = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT);
this.describeStreamConsumerMaxRetries = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_RETRIES))
.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_RETRIES);
this.describeStreamConsumerBaseBackoffMillis = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE);
this.describeStreamConsumerMaxBackoffMillis = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX))
.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX);
this.describeStreamConsumerExpConstant = Optional.ofNullable(
configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);
}