private static NimbusClient createNimbusClient()

in storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java [235:311]


    private static NimbusClient createNimbusClient(Map<String, Object> conf, String asUser, Integer timeout) {
        Nimbus.Iface override = _localOverrideClient;
        if (override != null) {
            return new NimbusClient(override);
        }
        Map<String, Object> fullConf = Utils.readStormConfig();
        fullConf.putAll(Utils.readCommandLineOpts());
        fullConf.putAll(conf);
        conf = fullConf;
        if (conf.containsKey(Config.STORM_DO_AS_USER)) {
            if (asUser != null && !asUser.isEmpty()) {
                LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence.",
                         asUser, conf.get(Config.STORM_DO_AS_USER));
            }
            asUser = (String) conf.get(Config.STORM_DO_AS_USER);
        }

        if (asUser == null || asUser.isEmpty()) {
            //The user is not set so lets see what the request context is.
            ReqContext context = ReqContext.context();
            Principal principal = context.principal();
            asUser = principal == null ? null : principal.getName();
            LOG.debug("Will impersonate {} based off of request context.", asUser);
        }

        List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);

        boolean useTls = ObjectReader.getBoolean(conf.get(Config.NIMBUS_THRIFT_CLIENT_USE_TLS), false);
        if (useTls && null == ObjectReader.getString(conf.get(Config.NIMBUS_THRIFT_TLS_TRANSPORT_PLUGIN))) {
            throw new RuntimeException(Config.NIMBUS_THRIFT_TLS_TRANSPORT_PLUGIN
                    + " must be set to use a transport plugin that supports tls");
        }

        int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
        int tlsPort = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_TLS_PORT).toString());
        int configuredPortToUse = useTls ? tlsPort : port;

        for (String host : seeds) {
            NimbusSummary nimbusSummary;
            NimbusClient client = null;
            try {
                client = new NimbusClient(conf, host, configuredPortToUse, timeout, asUser, useTls);
                nimbusSummary = client.getClient().getLeader();
                if (nimbusSummary != null) {
                    String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port() + ":" + nimbusSummary.get_tlsPort();
                    if (shouldLogLeader(leaderNimbus)) {
                        LOG.info("Found leader nimbus : {}", leaderNimbus);
                    }

                    int nimbusPortFromSummary = useTls ? nimbusSummary.get_tlsPort() : nimbusSummary.get_port();
                    if (nimbusSummary.get_host().equals(host) && nimbusPortFromSummary == port) {
                        NimbusClient ret = client;
                        client = null;
                        return ret;
                    }
                    try {
                        return new NimbusClient(conf, nimbusSummary.get_host(), nimbusPortFromSummary, timeout, asUser, useTls);
                    } catch (TTransportException e) {
                        throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
                         + ". will retry with a different seed host.", e);
                continue;
            } finally {
                if (client != null) {
                    client.close();
                }
            }
            throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.");
        }
        throw new NimbusLeaderNotFoundException(
            "Could not find leader nimbus from seed hosts " + seeds + ". "
            + "Did you specify a valid list of nimbus hosts for config "
            + Config.NIMBUS_SEEDS + "?");
    }