public static KinesisProducerConfiguration getValidatedProducerConfiguration()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java [477:513]


    public static KinesisProducerConfiguration getValidatedProducerConfiguration(
            Properties config) {
        checkNotNull(config, "config can not be null");

        validateAwsConfiguration(config);

        if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
            // per requirement in Amazon Kinesis Producer Library
            throw new IllegalArgumentException(
                    String.format(
                            "For FlinkKinesisProducer AWS region ('%s') must be set in the config.",
                            AWSConfigConstants.AWS_REGION));
        }

        KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
        kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));

        kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));

        // we explicitly lower the credential refresh delay (default is 5 seconds)
        // to avoid an ignorable interruption warning that occurs when shutting down the
        // KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
        kpc.setCredentialsRefreshDelay(100);

        // Override default values if they aren't specified by users
        if (!config.containsKey(RATE_LIMIT)) {
            kpc.setRateLimit(DEFAULT_RATE_LIMIT);
        }
        if (!config.containsKey(THREADING_MODEL)) {
            kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
        }
        if (!config.containsKey(THREAD_POOL_SIZE)) {
            kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
        }

        return kpc;
    }