in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java [94:325]
public static void validateConsumerConfiguration(Properties config, List<String> streams) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
RecordPublisherType recordPublisherType = validateRecordPublisherType(config);
if (recordPublisherType == RecordPublisherType.EFO) {
validateEfoConfiguration(config, streams);
}
if (!(config.containsKey(AWSConfigConstants.AWS_REGION)
|| config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(
String.format(
"For FlinkKinesisConsumer AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
}
if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
String initPosType =
config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
// specified initial position in stream must be either LATEST, TRIM_HORIZON or
// AT_TIMESTAMP
try {
InitialPosition.valueOf(initPosType);
} catch (IllegalArgumentException e) {
String errorMessage =
Arrays.stream(InitialPosition.values())
.map(Enum::name)
.collect(Collectors.joining(", "));
throw new IllegalArgumentException(
"Invalid initial position in stream set in config. Valid values are: "
+ errorMessage);
}
// specified initial timestamp in stream when using AT_TIMESTAMP
if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
if (!config.containsKey(STREAM_INITIAL_TIMESTAMP)) {
throw new IllegalArgumentException(
"Please set value for initial timestamp ('"
+ STREAM_INITIAL_TIMESTAMP
+ "') when using AT_TIMESTAMP initial position.");
}
validateOptionalDateProperty(
config,
STREAM_INITIAL_TIMESTAMP,
config.getProperty(
STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
}
}
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
"Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
"Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
"Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(
config,
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
"Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
"Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
"Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(
config,
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
"Invalid value given for list shards operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
"Invalid value given for list shards operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(
config,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_RETRIES,
"Invalid value given for maximum retry attempts for describe stream consumer operation. Must be a valid non-negative int value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_MAX,
"Invalid value given for describe stream consumer operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(
config,
ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for describe stream consumer operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_BACKOFF_BASE,
"Invalid value given for describe stream consumer operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
"Invalid value given for maximum retry attempts for register stream operation. Must be a valid non-negative integer value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS,
"Invalid value given for maximum timeout for register stream consumer. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
"Invalid value given for register stream operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
"Invalid value given for register stream operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(
config,
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for register stream operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
"Invalid value given for maximum retry attempts for deregister stream operation. Must be a valid non-negative integer value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.DEREGISTER_STREAM_TIMEOUT_SECONDS,
"Invalid value given for maximum timeout for deregister stream consumer. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
"Invalid value given for deregister stream operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
"Invalid value given for deregister stream operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(
config,
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for deregister stream operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
"Invalid value given for maximum retry attempts for subscribe to shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
"Invalid value given for subscribe to shard operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(
config,
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
"Invalid value given for subscribe to shard operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(
config,
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for subscribe to shard operation backoff exponential constant. Must be a valid non-negative double value.");
if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
checkArgument(
Long.parseLong(
config.getProperty(
ConsumerConfigConstants
.SHARD_GETRECORDS_INTERVAL_MILLIS))
< ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
"Invalid value given for getRecords sleep interval in milliseconds. Must be lower than "
+ ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS
+ " milliseconds.");
}
validateOptionalPositiveIntProperty(
config,
ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
"Invalid value given for EFO HTTP client max concurrency. Must be positive.");
}