in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java [77:115]
public static void validateStreamSourceConfiguration(Configuration config) {
checkNotNull(config, "Config can not be null");
Properties consumerProperties = new Properties();
config.addAllToProperties(consumerProperties);
validateAwsConfiguration(consumerProperties);
if (!(config.containsKey(AWSConfigConstants.AWS_REGION)
|| config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(
String.format(
"For KinesisStreamsSource AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
}
if (config.contains(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION)) {
KinesisSourceConfigOptions.InitialPosition initPosType =
config.get(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION);
// specified initial timestamp in stream when using AT_TIMESTAMP
if (initPosType == KinesisSourceConfigOptions.InitialPosition.AT_TIMESTAMP) {
if (!config.contains(STREAM_INITIAL_TIMESTAMP)) {
throw new IllegalArgumentException(
"Please set value for initial timestamp ('"
+ STREAM_INITIAL_TIMESTAMP
+ "') when using AT_TIMESTAMP initial position.");
}
validateOptionalDateProperty(
consumerProperties,
String.valueOf(STREAM_INITIAL_TIMESTAMP),
config.get(STREAM_TIMESTAMP_DATE_FORMAT),
String.format(
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+ "Must be a valid format: ('%s') or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .",
config.get(STREAM_TIMESTAMP_DATE_FORMAT)));
}
}
}