public static void validateConsumerConfiguration()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/util/KinesisConfigUtil.java [112:261]


	public static void validateConsumerConfiguration(Properties config, List<String> streams) {
		checkNotNull(config, "config can not be null");

		validateAwsConfiguration(config);

		RecordPublisherType recordPublisherType = validateRecordPublisherType(config);

		if (recordPublisherType == RecordPublisherType.EFO) {
			validateEfoConfiguration(config, streams);
		}

		if (!(config.containsKey(AWSConfigConstants.AWS_REGION) || config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
			// per validation in AwsClientBuilder
			throw new IllegalArgumentException(String.format("For FlinkKinesisConsumer AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.",
				AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
		}

		if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
			String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);

			// specified initial position in stream must be either LATEST, TRIM_HORIZON or AT_TIMESTAMP
			try {
				InitialPosition.valueOf(initPosType);
			} catch (IllegalArgumentException e) {
				String errorMessage = Arrays.stream(InitialPosition.values())
					.map(Enum::name).collect(Collectors.joining(", "));
				throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + errorMessage);
			}

			// specified initial timestamp in stream when using AT_TIMESTAMP
			if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
				if (!config.containsKey(STREAM_INITIAL_TIMESTAMP)) {
					throw new IllegalArgumentException("Please set value for initial timestamp ('"
						+ STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
				}
				validateOptionalDateProperty(config,
					STREAM_INITIAL_TIMESTAMP,
					config.getProperty(STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
					"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
						+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
			}
		}
		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
			"Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
			"Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
			"Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
			"Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
			"Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
			"Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
			"Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
			"Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
			"Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
			"Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
			"Invalid value given for list shards operation base backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
			"Invalid value given for list shards operation max backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
			"Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_RETRIES,
			"Invalid value given for maximum retry attempts for describe stream consumer operation. Must be a valid non-negative int value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX,
			"Invalid value given for describe stream consumer operation max backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT,
			"Invalid value given for describe stream consumer operation backoff exponential constant. Must be a valid non-negative double value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE,
			"Invalid value given for describe stream consumer operation base backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
			"Invalid value given for maximum retry attempts for register stream operation. Must be a valid non-negative integer value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS,
			"Invalid value given for maximum timeout for register stream consumer. Must be a valid non-negative integer value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
			"Invalid value given for register stream operation max backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
			"Invalid value given for register stream operation base backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
			"Invalid value given for register stream operation backoff exponential constant. Must be a valid non-negative double value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
			"Invalid value given for maximum retry attempts for deregister stream operation. Must be a valid non-negative integer value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_TIMEOUT_SECONDS,
			"Invalid value given for maximum timeout for deregister stream consumer. Must be a valid non-negative integer value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
			"Invalid value given for deregister stream operation base backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
			"Invalid value given for deregister stream operation max backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
			"Invalid value given for deregister stream operation backoff exponential constant. Must be a valid non-negative double value.");

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
			"Invalid value given for maximum retry attempts for subscribe to shard operation. Must be a valid non-negative integer value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
			"Invalid value given for subscribe to shard operation base backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
			"Invalid value given for subscribe to shard operation max backoff milliseconds. Must be a valid non-negative long value.");

		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
			"Invalid value given for subscribe to shard operation backoff exponential constant. Must be a valid non-negative double value.");

		if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
			checkArgument(
				Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
					< ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
				"Invalid value given for getRecords sleep interval in milliseconds. Must be lower than " +
					ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
			);
		}

		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
			"Invalid value given for EFO HTTP client max concurrency. Must be positive.");
	}