in automation/src/main/java/org/greenplum/pxf/automation/components/hdfs/Hdfs.java [112:207]
public void init() throws Exception {
super.init();
ReportUtils.startLevel(report, getClass(), "Init");
config = new Configuration();
// if hadoop root exists in the SUT file, load configuration from it
if (StringUtils.isNotEmpty(hadoopRoot)) {
hadoopRoot = replaceUser(hadoopRoot);
ReportUtils.startLevel(report, getClass(), "Using root directory: " + hadoopRoot);
ProtocolEnum protocol = ProtocolUtils.getProtocol();
if (protocol == ProtocolEnum.HDFS) {
config.addResource(new Path(getHadoopRoot() + "/conf/core-site.xml"));
config.addResource(new Path(getHadoopRoot() + "/conf/hdfs-site.xml"));
config.addResource(new Path(getHadoopRoot() + "/conf/mapred-site.xml"));
} else {
// (i.e) For s3 protocol the file should be s3-site.xml
config.addResource(new Path(getHadoopRoot() + "/" + protocol.value() + "-site.xml"));
config.addResource(new Path(getHadoopRoot() + "/mapred-site.xml"));
config.set("fs.defaultFS", getScheme() + "://" + getWorkingDirectory());
}
} else {
if (StringUtils.isEmpty(host)) {
throw new Exception("host in hdfs component not configured in SUT");
}
if (StringUtils.isNotEmpty(haNameservice)) {
if (StringUtils.isEmpty(hostStandby)) {
throw new Exception(
"hostStandby in hdfs component not configured in SUT");
}
config.set("fs.defaultFS", "hdfs://" + haNameservice + "/");
config.set("dfs.client.failover.proxy.provider.mycluster",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
config.set("dfs.nameservices", haNameservice);
config.set("dfs.ha.namenodes" + "." + haNameservice, "nn1,nn2");
config.set(
"dfs.namenode.rpc-address." + haNameservice + ".nn1",
host + ":" + port);
config.set(
"dfs.namenode.rpc-address." + haNameservice + ".nn2",
hostStandby + ":" + port);
} else {
config.set("fs.defaultFS", "hdfs://" + host + ":" + port + "/");
}
}
// for Hadoop clusters provisioned in the cloud when running from local workstation
if (useDatanodeHostname != null && Boolean.parseBoolean(useDatanodeHostname)) {
config.set("dfs.client.use.datanode.hostname", "true");
}
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
fs = FileSystem.get(config);
setDefaultReplicationSize();
setDefaultBlockSize();
setDefaultBufferSize();
ReportUtils.report(report, getClass(), "Block Size: " + getBlockSize());
ReportUtils.report(report, getClass(), "Replications: "
+ getReplicationSize());
if (getSshUserName() != null) {
ReportUtils.report(report, getClass(), "Opening connection to namenode " + getHost());
namenodeSso = new ShellSystemObject(report.isSilent());
String namenodeHost = getHost();
if (namenodeHost != null && namenodeHost.equals("ipa-hadoop")) {
// this is for local testing, where hostname in SUT will be "ipa-hadoop", tests is CI substitute
// it with a short hostname
namenodeHost = getHostForConfiguredNameNode1HA();
}
namenodeSso.setHost(namenodeHost);
namenodeSso.setUserName(getSshUserName());
namenodeSso.setPrivateKey(getSshPrivateKey());
namenodeSso.init();
// source environment file
namenodeSso.runCommand("source ~/.bash_profile");
namenodePrincipal = config.get("dfs.namenode.kerberos.principal");
namenodeKeytab = config.get("dfs.namenode.keytab.file");
if (namenodePrincipal != null) {
// substitute _HOST portion of the principal with the namenode FQDN, need to get it from the
// configuration, since namenodeHost might contain a short hostname
namenodePrincipal = namenodePrincipal.replace("_HOST", getHostForConfiguredNameNode1HA());
// kinit as the principal to be ready to perform HDFS commands later
// e.g. "kinit -kt /opt/security/keytab/hdfs.service.keytab hdfs/ccp-user-nn01.c.gcp-project.internal"
StringBuilder kinitCommand = new StringBuilder("kinit -kt ")
.append(namenodeKeytab).append(" ").append(namenodePrincipal);
namenodeSso.runCommand(kinitCommand.toString());
}
}
ReportUtils.stopLevel(report);
}