in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java [128:321]
public FanOutRecordPublisherConfiguration(
final Properties configProps, final List<String> streams) {
Preconditions.checkArgument(
configProps
.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)
.equals(RecordPublisherType.EFO.toString()),
"Only efo record publisher can register a FanOutProperties.");
KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
efoRegistrationType =
EFORegistrationType.valueOf(
configProps.getProperty(
ConsumerConfigConstants.EFO_REGISTRATION_TYPE,
EFORegistrationType.EAGER.toString()));
// if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer
// name for each stream.
if (efoRegistrationType == EFORegistrationType.EAGER
|| efoRegistrationType == EFORegistrationType.LAZY) {
consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
}
for (String stream : streams) {
String key = efoConsumerArn(stream);
if (configProps.containsKey(key)) {
streamConsumerArns.put(stream, configProps.getProperty(key));
}
}
this.subscribeToShardMaxRetries =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES);
this.subscribeToShardTimeout =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
this.subscribeToShardBaseBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE);
this.subscribeToShardMaxBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX);
this.subscribeToShardExpConstant =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble)
.orElse(
ConsumerConfigConstants
.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT);
this.registerStreamBaseBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE);
this.registerStreamMaxBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX);
this.registerStreamExpConstant =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble)
.orElse(
ConsumerConfigConstants
.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
this.registerStreamMaxRetries =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES);
this.registerStreamConsumerTimeout =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_TIMEOUT);
this.deregisterStreamBaseBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE);
this.deregisterStreamMaxBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX);
this.deregisterStreamExpConstant =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble)
.orElse(
ConsumerConfigConstants
.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
this.deregisterStreamMaxRetries =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES);
this.deregisterStreamConsumerTimeout =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DEREGISTER_STREAM_TIMEOUT_SECONDS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_TIMEOUT);
this.describeStreamMaxRetries =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRIES);
this.describeStreamBaseBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE);
this.describeStreamMaxBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX))
.map(Long::parseLong)
.orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX);
this.describeStreamExpConstant =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble)
.orElse(
ConsumerConfigConstants
.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT);
this.describeStreamConsumerMaxRetries =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_RETRIES);
this.describeStreamConsumerBaseBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE))
.map(Long::parseLong)
.orElse(
ConsumerConfigConstants
.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE);
this.describeStreamConsumerMaxBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX))
.map(Long::parseLong)
.orElse(
ConsumerConfigConstants
.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX);
this.describeStreamConsumerExpConstant =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants
.DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT))
.map(Double::parseDouble)
.orElse(
ConsumerConfigConstants
.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);
}