public FanOutRecordPublisherConfiguration()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java [132:327]


    public FanOutRecordPublisherConfiguration(
            final Properties configProps, final List<String> streams) {
        Preconditions.checkArgument(
                configProps
                        .getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)
                        .equals(RecordPublisherType.EFO.toString()),
                "Only efo record publisher can register a FanOutProperties.");
        KinesisConfigUtil.validateEfoConfiguration(configProps, streams);

        efoRegistrationType =
                EFORegistrationType.valueOf(
                        configProps.getProperty(
                                ConsumerConfigConstants.EFO_REGISTRATION_TYPE,
                                EFORegistrationType.EAGER.toString()));
        // if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer
        // name for each stream.
        if (efoRegistrationType == EFORegistrationType.EAGER
                || efoRegistrationType == EFORegistrationType.LAZY) {
            consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
        }

        for (String stream : streams) {
            String key = efoConsumerArn(stream);
            if (configProps.containsKey(key)) {
                streamConsumerArns.put(stream, configProps.getProperty(key));
            }
        }

        this.subscribeToShardMaxRetries =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES))
                        .map(Integer::parseInt)
                        .orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES);
        this.subscribeToShardTimeout =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS))
                        .map(Integer::parseInt)
                        .map(Duration::ofSeconds)
                        .orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
        this.subscribeToShardBaseBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE);
        this.subscribeToShardMaxBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX);
        this.subscribeToShardExpConstant =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants
                                                .SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT))
                        .map(Double::parseDouble)
                        .orElse(
                                ConsumerConfigConstants
                                        .DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT);

        this.registerStreamBaseBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE);
        this.registerStreamMaxBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX);
        this.registerStreamExpConstant =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants
                                                .REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
                        .map(Double::parseDouble)
                        .orElse(
                                ConsumerConfigConstants
                                        .DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
        this.registerStreamMaxRetries =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.REGISTER_STREAM_RETRIES))
                        .map(Integer::parseInt)
                        .orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES);
        this.registerStreamConsumerTimeout =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS))
                        .map(Integer::parseInt)
                        .map(Duration::ofSeconds)
                        .orElse(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_TIMEOUT);

        this.deregisterStreamBaseBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE);
        this.deregisterStreamMaxBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX);
        this.deregisterStreamExpConstant =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants
                                                .DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT))
                        .map(Double::parseDouble)
                        .orElse(
                                ConsumerConfigConstants
                                        .DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT);
        this.deregisterStreamMaxRetries =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES))
                        .map(Integer::parseInt)
                        .orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES);
        this.deregisterStreamConsumerTimeout =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.DEREGISTER_STREAM_TIMEOUT_SECONDS))
                        .map(Integer::parseInt)
                        .map(Duration::ofSeconds)
                        .orElse(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_TIMEOUT);

        this.describeStreamMaxRetries =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.STREAM_DESCRIBE_RETRIES))
                        .map(Integer::parseInt)
                        .orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRIES);
        this.describeStreamBaseBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE);
        this.describeStreamMaxBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX))
                        .map(Long::parseLong)
                        .orElse(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX);
        this.describeStreamExpConstant =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants
                                                .STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))
                        .map(Double::parseDouble)
                        .orElse(
                                ConsumerConfigConstants
                                        .DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT);
        this.describeStreamConsumerMaxRetries =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_RETRIES))
                        .map(Integer::parseInt)
                        .orElse(ConsumerConfigConstants.DEFAULT_DESCRIBE_STREAM_CONSUMER_RETRIES);
        this.describeStreamConsumerBaseBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants
                                                .DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE))
                        .map(Long::parseLong)
                        .orElse(
                                ConsumerConfigConstants
                                        .DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE);
        this.describeStreamConsumerMaxBackoffMillis =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants
                                                .DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX))
                        .map(Long::parseLong)
                        .orElse(
                                ConsumerConfigConstants
                                        .DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX);
        this.describeStreamConsumerExpConstant =
                Optional.ofNullable(
                                configProps.getProperty(
                                        ConsumerConfigConstants
                                                .DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT))
                        .map(Double::parseDouble)
                        .orElse(
                                ConsumerConfigConstants
                                        .DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);

        this.recoverableErrorsConfig = this.parseRecoverableErrorConfig(configProps);
    }