public FanOutRecordPublisherConfiguration()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java [183:289]


	public FanOutRecordPublisherConfiguration(final Properties configProps, final List<String> streams) {
		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);

		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
		}

		for (String stream : streams) {
			String key = efoConsumerArn(stream);
			if (configProps.containsKey(key)) {
				streamConsumerArns.put(stream, configProps.getProperty(key));
			}
		}

		this.subscribeToShardMaxRetries = Optional
			.ofNullable(configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES))
			.map(Integer::parseInt)
			.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES);
		this.subscribeToShardTimeout = Optional
				.ofNullable(configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS))
				.map(Integer::parseInt)
				.map(Duration::ofSeconds)
				.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
		this.subscribeToShardBaseBackoffMillis = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE);
		this.subscribeToShardMaxBackoffMillis = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX);
		this.subscribeToShardExpConstant = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT))
			.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT);

		this.registerStreamBaseBackoffMillis = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE);
		this.registerStreamMaxBackoffMillis = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX);
		this.registerStreamExpConstant = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
			.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
		this.registerStreamMaxRetries = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.REGISTER_STREAM_RETRIES))
			.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES);
		this.registerStreamConsumerTimeout = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS))
			.map(Integer::parseInt)
			.map(Duration::ofSeconds)
			.orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_TIMEOUT);

		this.deregisterStreamBaseBackoffMillis = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE);
		this.deregisterStreamMaxBackoffMillis = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX);
		this.deregisterStreamExpConstant = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
			.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
		this.deregisterStreamMaxRetries = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES))
			.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES);
		this.deregisterStreamConsumerTimeout = Optional.ofNullable(
			configProps.getProperty(
				ConsumerConfigConstants.DEREGISTER_STREAM_TIMEOUT_SECONDS))
			.map(Integer::parseInt)
			.map(Duration::ofSeconds)
			.orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_TIMEOUT);

		this.describeStreamMaxRetries = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_RETRIES))
			.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRIES);
		this.describeStreamBaseBackoffMillis = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE);
		this.describeStreamMaxBackoffMillis = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX);
		this.describeStreamExpConstant = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))
			.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT);
		this.describeStreamConsumerMaxRetries = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_RETRIES))
			.map(Integer::parseInt).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_RETRIES);
		this.describeStreamConsumerBaseBackoffMillis = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE);
		this.describeStreamConsumerMaxBackoffMillis = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX))
			.map(Long::parseLong).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX);
		this.describeStreamConsumerExpConstant = Optional.ofNullable(
			configProps.getProperty(ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT))
			.map(Double::parseDouble).orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);
	}