public static void validateConsumerConfiguration()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java [94:325]


    public static void validateConsumerConfiguration(Properties config, List<String> streams) {
        checkNotNull(config, "config can not be null");

        validateAwsConfiguration(config);

        RecordPublisherType recordPublisherType = validateRecordPublisherType(config);

        if (recordPublisherType == RecordPublisherType.EFO) {
            validateEfoConfiguration(config, streams);
        }

        if (!(config.containsKey(AWSConfigConstants.AWS_REGION)
                || config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) {
            // per validation in AwsClientBuilder
            throw new IllegalArgumentException(
                    String.format(
                            "For FlinkKinesisConsumer AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.",
                            AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
        }

        if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
            String initPosType =
                    config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);

            // specified initial position in stream must be either LATEST, TRIM_HORIZON or
            // AT_TIMESTAMP
            try {
                InitialPosition.valueOf(initPosType);
            } catch (IllegalArgumentException e) {
                String errorMessage =
                        Arrays.stream(InitialPosition.values())
                                .map(Enum::name)
                                .collect(Collectors.joining(", "));
                throw new IllegalArgumentException(
                        "Invalid initial position in stream set in config. Valid values are: "
                                + errorMessage);
            }

            // specified initial timestamp in stream when using AT_TIMESTAMP
            if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
                if (!config.containsKey(STREAM_INITIAL_TIMESTAMP)) {
                    throw new IllegalArgumentException(
                            "Please set value for initial timestamp ('"
                                    + STREAM_INITIAL_TIMESTAMP
                                    + "') when using AT_TIMESTAMP initial position.");
                }
                validateOptionalDateProperty(
                        config,
                        STREAM_INITIAL_TIMESTAMP,
                        config.getProperty(
                                STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
                        "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
                                + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
            }
        }
        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
                "Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
                "Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
                "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
                "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveDoubleProperty(
                config,
                ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
                "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
                "Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
                "Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
                "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
                "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveDoubleProperty(
                config,
                ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
                "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
                "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
                "Invalid value given for list shards operation base backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
                "Invalid value given for list shards operation max backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveDoubleProperty(
                config,
                ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
                "Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_RETRIES,
                "Invalid value given for maximum retry attempts for describe stream consumer operation. Must be a valid non-negative int value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX,
                "Invalid value given for describe stream consumer operation max backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveDoubleProperty(
                config,
                ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT,
                "Invalid value given for describe stream consumer operation backoff exponential constant. Must be a valid non-negative double value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE,
                "Invalid value given for describe stream consumer operation base backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
                "Invalid value given for maximum retry attempts for register stream operation. Must be a valid non-negative integer value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS,
                "Invalid value given for maximum timeout for register stream consumer. Must be a valid non-negative integer value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
                "Invalid value given for register stream operation max backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
                "Invalid value given for register stream operation base backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveDoubleProperty(
                config,
                ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
                "Invalid value given for register stream operation backoff exponential constant. Must be a valid non-negative double value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
                "Invalid value given for maximum retry attempts for deregister stream operation. Must be a valid non-negative integer value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.DEREGISTER_STREAM_TIMEOUT_SECONDS,
                "Invalid value given for maximum timeout for deregister stream consumer. Must be a valid non-negative integer value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
                "Invalid value given for deregister stream operation base backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
                "Invalid value given for deregister stream operation max backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveDoubleProperty(
                config,
                ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
                "Invalid value given for deregister stream operation backoff exponential constant. Must be a valid non-negative double value.");

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
                "Invalid value given for maximum retry attempts for subscribe to shard operation. Must be a valid non-negative integer value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
                "Invalid value given for subscribe to shard operation base backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveLongProperty(
                config,
                ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
                "Invalid value given for subscribe to shard operation max backoff milliseconds. Must be a valid non-negative long value.");

        validateOptionalPositiveDoubleProperty(
                config,
                ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
                "Invalid value given for subscribe to shard operation backoff exponential constant. Must be a valid non-negative double value.");

        if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
            checkArgument(
                    Long.parseLong(
                                    config.getProperty(
                                            ConsumerConfigConstants
                                                    .SHARD_GETRECORDS_INTERVAL_MILLIS))
                            < ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
                    "Invalid value given for getRecords sleep interval in milliseconds. Must be lower than "
                            + ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS
                            + " milliseconds.");
        }

        validateOptionalPositiveIntProperty(
                config,
                ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
                "Invalid value given for EFO HTTP client max concurrency. Must be positive.");
    }