in storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java [235:311]
private static NimbusClient createNimbusClient(Map<String, Object> conf, String asUser, Integer timeout) {
Nimbus.Iface override = _localOverrideClient;
if (override != null) {
return new NimbusClient(override);
}
Map<String, Object> fullConf = Utils.readStormConfig();
fullConf.putAll(Utils.readCommandLineOpts());
fullConf.putAll(conf);
conf = fullConf;
if (conf.containsKey(Config.STORM_DO_AS_USER)) {
if (asUser != null && !asUser.isEmpty()) {
LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence.",
asUser, conf.get(Config.STORM_DO_AS_USER));
}
asUser = (String) conf.get(Config.STORM_DO_AS_USER);
}
if (asUser == null || asUser.isEmpty()) {
//The user is not set so lets see what the request context is.
ReqContext context = ReqContext.context();
Principal principal = context.principal();
asUser = principal == null ? null : principal.getName();
LOG.debug("Will impersonate {} based off of request context.", asUser);
}
List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
boolean useTls = ObjectReader.getBoolean(conf.get(Config.NIMBUS_THRIFT_CLIENT_USE_TLS), false);
if (useTls && null == ObjectReader.getString(conf.get(Config.NIMBUS_THRIFT_TLS_TRANSPORT_PLUGIN))) {
throw new RuntimeException(Config.NIMBUS_THRIFT_TLS_TRANSPORT_PLUGIN
+ " must be set to use a transport plugin that supports tls");
}
int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
int tlsPort = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_TLS_PORT).toString());
int configuredPortToUse = useTls ? tlsPort : port;
for (String host : seeds) {
NimbusSummary nimbusSummary;
NimbusClient client = null;
try {
client = new NimbusClient(conf, host, configuredPortToUse, timeout, asUser, useTls);
nimbusSummary = client.getClient().getLeader();
if (nimbusSummary != null) {
String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port() + ":" + nimbusSummary.get_tlsPort();
if (shouldLogLeader(leaderNimbus)) {
LOG.info("Found leader nimbus : {}", leaderNimbus);
}
int nimbusPortFromSummary = useTls ? nimbusSummary.get_tlsPort() : nimbusSummary.get_port();
if (nimbusSummary.get_host().equals(host) && nimbusPortFromSummary == port) {
NimbusClient ret = client;
client = null;
return ret;
}
try {
return new NimbusClient(conf, nimbusSummary.get_host(), nimbusPortFromSummary, timeout, asUser, useTls);
} catch (TTransportException e) {
throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
}
}
} catch (Exception e) {
LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
+ ". will retry with a different seed host.", e);
continue;
} finally {
if (client != null) {
client.close();
}
}
throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.");
}
throw new NimbusLeaderNotFoundException(
"Could not find leader nimbus from seed hosts " + seeds + ". "
+ "Did you specify a valid list of nimbus hosts for config "
+ Config.NIMBUS_SEEDS + "?");
}