public static void validateStreamSourceConfiguration()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java [77:115]


    public static void validateStreamSourceConfiguration(Configuration config) {
        checkNotNull(config, "Config can not be null");

        Properties consumerProperties = new Properties();
        config.addAllToProperties(consumerProperties);
        validateAwsConfiguration(consumerProperties);

        if (!(config.containsKey(AWSConfigConstants.AWS_REGION)
                || config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) {
            // per validation in AwsClientBuilder
            throw new IllegalArgumentException(
                    String.format(
                            "For KinesisStreamsSource AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.",
                            AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
        }

        if (config.contains(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION)) {
            KinesisSourceConfigOptions.InitialPosition initPosType =
                    config.get(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION);

            // specified initial timestamp in stream when using AT_TIMESTAMP
            if (initPosType == KinesisSourceConfigOptions.InitialPosition.AT_TIMESTAMP) {
                if (!config.contains(STREAM_INITIAL_TIMESTAMP)) {
                    throw new IllegalArgumentException(
                            "Please set value for initial timestamp ('"
                                    + STREAM_INITIAL_TIMESTAMP
                                    + "') when using AT_TIMESTAMP initial position.");
                }
                validateOptionalDateProperty(
                        consumerProperties,
                        String.valueOf(STREAM_INITIAL_TIMESTAMP),
                        config.get(STREAM_TIMESTAMP_DATE_FORMAT),
                        String.format(
                                "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
                                        + "Must be a valid format: ('%s') or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .",
                                config.get(STREAM_TIMESTAMP_DATE_FORMAT)));
            }
        }
    }