in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java [477:513]
public static KinesisProducerConfiguration getValidatedProducerConfiguration(
Properties config) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
// per requirement in Amazon Kinesis Producer Library
throw new IllegalArgumentException(
String.format(
"For FlinkKinesisProducer AWS region ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION));
}
KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
// we explicitly lower the credential refresh delay (default is 5 seconds)
// to avoid an ignorable interruption warning that occurs when shutting down the
// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
kpc.setCredentialsRefreshDelay(100);
// Override default values if they aren't specified by users
if (!config.containsKey(RATE_LIMIT)) {
kpc.setRateLimit(DEFAULT_RATE_LIMIT);
}
if (!config.containsKey(THREADING_MODEL)) {
kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
}
if (!config.containsKey(THREAD_POOL_SIZE)) {
kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
}
return kpc;
}