in seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/src/main/java/org/apache/seatunnel/datasource/plugin/s3/HadoopS3AConfiguration.java [43:91]
public static Configuration getConfiguration(Map<String, String> s3Options) {
if (!s3Options.containsKey(S3OptionRule.BUCKET.key())) {
throw new IllegalArgumentException(
"S3 datasource bucket is null, please check your config");
}
if (!s3Options.containsKey(S3OptionRule.FS_S3A_ENDPOINT.key())) {
throw new IllegalArgumentException(
"S3 datasource endpoint is null, please check your config");
}
String bucket = s3Options.get(S3OptionRule.BUCKET.key());
String protocol = DEFAULT_PROTOCOL;
if (bucket.startsWith(S3A_PROTOCOL)) {
protocol = S3A_PROTOCOL;
}
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, bucket);
hadoopConf.set(
S3OptionRule.FS_S3A_ENDPOINT.key(),
s3Options.get(S3OptionRule.FS_S3A_ENDPOINT.key()));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
if (s3Options.containsKey(S3OptionRule.HADOOP_S3_PROPERTIES.key())) {
Config configObject =
ConfigFactory.parseString(
s3Options.get(S3OptionRule.HADOOP_S3_PROPERTIES.key()));
configObject
.entrySet()
.forEach(
entry -> {
hadoopConf.set(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
if (S3OptionRule.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider.getProvider()
.equals(s3Options.get(S3OptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()))) {
hadoopConf.set(
S3OptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(S3OptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
hadoopConf.set("fs.s3a.access.key", s3Options.get(S3OptionRule.ACCESS_KEY.key()));
hadoopConf.set("fs.s3a.secret.key", s3Options.get(S3OptionRule.SECRET_KEY.key()));
} else {
hadoopConf.set(
S3OptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(S3OptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
}
return hadoopConf;
}